Spaces:
Paused
Paused
| import requests, hashlib, os | |
| from others import * | |
| api_url = 'https://api.iwara.tv' | |
| file_url = 'https://files.iwara.tv' | |
| class BearerAuth(requests.auth.AuthBase): | |
| def __init__(self, token): | |
| self.token = token | |
| def __call__(self, r): | |
| r.headers['Authorization'] = 'Bearer ' + self.token | |
| return r | |
| class ApiClient: | |
| def __init__(self, email, password): | |
| self.email = email | |
| self.password = password | |
| self.api_url = api_url | |
| self.file_url = file_url | |
| self.timeout = 30 | |
| self.download_timeout = 300 | |
| self.token = None | |
| def login(self) -> requests.Response: | |
| url = self.api_url + '/user/login' | |
| json = {'email': self.email, 'password': self.password} | |
| r = requests.post(url, json=json, timeout=self.timeout) | |
| try: | |
| self.token = r.json()['token'] | |
| print('API Login success') | |
| except: | |
| print('API Login failed') | |
| return r | |
| def get_videos(self, sort='date', rating='all', page=0, limit=32, subscribed=False) -> requests.Response: | |
| url = self.api_url + '/videos' | |
| params = {'sort': sort, 'rating': rating, 'page': page, 'limit': limit, 'subscribed': 'true' if subscribed else 'false'} | |
| if self.token is None: | |
| r = requests.get(url, params=params, timeout=self.timeout) | |
| else: | |
| r = requests.get(url, params=params, auth=BearerAuth(self.token), timeout=self.timeout) | |
| return r | |
| def get_video(self, video_id) -> requests.Response: | |
| url = self.api_url + '/video/' + video_id | |
| if self.token is None: | |
| r = requests.get(url, timeout=self.timeout) | |
| else: | |
| r = requests.get(url, auth=BearerAuth(self.token), timeout=self.timeout) | |
| return r | |
| def download_video_thumbnail(self, video_id) -> str: | |
| video = self.get_video(video_id).json() | |
| file_id = video['file']['id'] | |
| thumbnail_id = video['thumbnail'] | |
| url = self.file_url + '/image/original/' + file_id + '/thumbnail-{:02d}.jpg'.format(thumbnail_id) | |
| title = video['title'] | |
| user = video['user'] | |
| name = user['name'] | |
| username = user['username'] | |
| print("Name:", name) | |
| print("Username:", username) | |
| print("Title:", title) | |
| judul = f"{name} - {title.replace(' / ', ' ').replace('/', ' ').replace('_', ' ').title().replace('’S', '’s').replace('Mmd','MMD').replace('Nikke','NIKKE').replace('Fate','FATE').replace('】','】 ').replace(' ', ' ')}" | |
| folder_download = f"/home/user/app/Iwara/Thumbnail/{name}" | |
| if not os.path.exists(folder_download): | |
| os.makedirs(folder_download) | |
| thumbnail_file_name = os.path.join(folder_download, judul + '.jpg') | |
| if os.path.exists(thumbnail_file_name): | |
| print(f"Video ID {video_id} thumbnail already downloaded, skipped downloading.") | |
| return thumbnail_file_name | |
| print(f"Downloading thumbnail for video ID: {video_id} ...") | |
| with open(thumbnail_file_name, 'wb') as f: | |
| response = requests.get(url, stream=True) | |
| total_size = int(response.headers.get('Content-Length', 0)) | |
| progress_bar = tqdm(total=total_size, unit='B', unit_scale=True, ncols=80) | |
| for chunk in response.iter_content(chunk_size=1024): | |
| if chunk: | |
| f.write(chunk) | |
| progress_bar.update(len(chunk)) | |
| progress_bar.close() | |
| return thumbnail_file_name | |
| def download_video(self, video_id) -> str: | |
| try: | |
| video = self.get_video(video_id).json() | |
| except Exception as e: | |
| raise Exception(f"Failed to get video info for video ID: {video_id}, error: {e}") | |
| url = video['fileUrl'] | |
| file_id = video['file']['id'] | |
| expires = url.split('/')[4].split('?')[1].split('&')[0].split('=')[1] | |
| SHA_postfix = "_5nFp9kmbNnHdAFhaqMvt" | |
| SHA_key = file_id + "_" + expires + SHA_postfix | |
| hash = hashlib.sha1(SHA_key.encode('utf-8')).hexdigest() | |
| headers = {"X-Version": hash} | |
| resources = requests.get(url, headers=headers, auth=BearerAuth(self.token), timeout=self.timeout).json() | |
| title = video['title'] | |
| user = video['user'] | |
| name = user['name'] | |
| username = user['username'] | |
| print("Name:", name) | |
| print("Username:", username) | |
| print("Title:", title) | |
| judul = f"{name} - {title.replace(' / ', ' ').replace('/', ' ').replace('_', ' ').title().replace('’S', '’s').replace('Mmd','MMD').replace('Nikke','NIKKE').replace('Fate','FATE').replace('】','】 ').replace(' ', ' ')}" | |
| folder_download = f"/home/user/app/Iwara/Video/{name}" | |
| if not os.path.exists(folder_download): | |
| os.makedirs(folder_download) | |
| quality_priority = ['Source', '1080', '720', '540', '480', '360'] | |
| resources_by_quality = {quality: None for quality in quality_priority} | |
| for resource in resources: | |
| if resource['name'] in resources_by_quality: | |
| resources_by_quality[resource['name']] = resource | |
| download_resource = None | |
| for quality in quality_priority: | |
| if resources_by_quality[quality] is not None: | |
| download_resource = resources_by_quality[quality] | |
| break | |
| if download_resource is None: | |
| raise Exception("No video with acceptable quality found") | |
| download_link = "https:" + download_resource['src']['download'] | |
| file_type = download_resource['type'].split('/')[1] | |
| video_file_name = os.path.join(folder_download, judul + '.' + file_type) | |
| if os.path.exists(video_file_name): | |
| print(f"Video ID {video_id} already downloaded, skipped downloading.") | |
| return video_file_name | |
| print(f"Downloading video ID: {video_id} ...") | |
| try: | |
| with open(video_file_name, 'wb') as file: | |
| response = requests.get(download_link, stream=True) | |
| total_size = int(response.headers.get("Content-Length", 0)) | |
| progress_bar = tqdm(total=total_size, unit="B", unit_scale=True, ncols=80) | |
| for chunk in response.iter_content(chunk_size=1024): | |
| if chunk: | |
| file.write(chunk) | |
| progress_bar.update(len(chunk)) | |
| progress_bar.close() | |
| return video_file_name | |
| except Exception as e: | |
| if os.path.exists(video_file_name): | |
| os.remove(video_file_name) | |
| raise Exception(f"Failed to download video ID: {video_id}, error: {e}") | |
| # def process_txt(txt_files, baris = 1): | |
| # for txt_file in txt_files: | |
| # file_path = os.path.join('/content', txt_file) | |
| # with open(file_path, 'r') as f: | |
| # links = f.read().splitlines() | |
| # for link in links: | |
| # if link.strip(): | |
| # print(f"Baris: {baris}") | |
| # video_id = link.split("/")[4] | |
| # client.download_video(video_id) | |
| # baris += 1 | |
| # print() | |
| # os.remove(file_path) | |
| # ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| def iwara(video_url): | |
| if 'client' not in globals(): | |
| client = ApiClient(email="AnimuHikari", password="Hikari123") | |
| client.login() | |
| video_id = video_url.split("/")[4] | |
| video_file = client.download_video(video_id) | |
| thumbnail_file = client.download_video_thumbnail(video_id) | |
| # Mengkonversi video | |
| video_file = convert_videos(720, video_file) | |
| judul = os.path.splitext(os.path.basename(video_file))[0] | |
| video_info = f"Judul: {judul}\n" | |
| return video_file, judul, video_info, thumbnail_file |