Spaces:
Sleeping
Sleeping
| import requests, hashlib, os | |
| from others import * | |
| api_url = 'https://api.iwara.tv' | |
| file_url = 'https://files.iwara.tv' | |
| class BearerAuth(requests.auth.AuthBase): | |
| """Bearer Authentication""" | |
| def __init__(self, token): | |
| self.token = token | |
| def __call__(self, r): | |
| r.headers['Authorization'] = 'Bearer ' + self.token | |
| return r | |
| class ApiClient: | |
| def __init__(self, email, password): | |
| self.email = email | |
| self.password = password | |
| # self.headers = { | |
| # 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36', | |
| # 'X-Version': 's' | |
| # } | |
| # API | |
| self.api_url = api_url | |
| self.file_url = file_url | |
| self.timeout = 30 | |
| # self.max_retries = 5 | |
| self.download_timeout = 300 | |
| self.token = None | |
| # HTML | |
| # self.html_url = html_url | |
| # Cloudscraper | |
| # self.scraper = cloudscraper.create_scraper(browser={'browser': 'firefox','platform': 'windows','mobile': False}, | |
| # # interpreter = 'nodejs' | |
| # ) | |
| # Requests-html | |
| # self.session = HTMLSession() | |
| def login(self) -> requests.Response: | |
| url = self.api_url + '/user/login' | |
| json = {'email': self.email, 'password': self.password} | |
| r = requests.post(url, json=json, timeout=self.timeout) | |
| try: | |
| self.token = r.json()['token'] | |
| print('API Login success') | |
| except: | |
| print('API Login failed') | |
| # try: | |
| # # Cloudscraper | |
| # # r = self.scraper.post(url, json=json, headers=self.headers, timeout=self.timeout) | |
| # # Requests-html | |
| # r = self.session.post(url, json=json, headers=self.headers, timeout=self.timeout) | |
| # except: | |
| # print('BS4 Login failed') | |
| return r | |
| # limit query is not working | |
| def get_videos(self, sort = 'date', rating = 'all', page = 0, limit = 32, subscribed = False) -> requests.Response: | |
| """# Get new videos from iwara.tv | |
| - sort: date, trending, popularity, views, likes | |
| - rating: all, general, ecchi | |
| """ | |
| url = self.api_url + '/videos' | |
| params = {'sort': sort, | |
| 'rating': rating, | |
| 'page': page, | |
| 'limit': limit, | |
| 'subscribed': 'true' if subscribed else 'false', | |
| } | |
| if self.token is None: | |
| r = requests.get(url, params=params, timeout=self.timeout) | |
| else: | |
| # Verbose Debug | |
| # request = requests.Request('GET', url, params=params, auth=BearerAuth(self.token)) | |
| # print(request.prepare().method, request.prepare().url, request.prepare().headers, request.prepare().body, sep='\n') | |
| # r = requests.Session().send(request.prepare()) | |
| r = requests.get(url, params=params, auth=BearerAuth(self.token), timeout=self.timeout) | |
| #Debug | |
| print("[DEBUG] get_videos response:", r) | |
| return r | |
| def get_video(self, video_id) -> requests.Response: | |
| """# Get video info from iwara.tv | |
| """ | |
| url = self.api_url + '/video/' + video_id | |
| if self.token is None: | |
| r = requests.get(url, timeout=self.timeout) | |
| else: | |
| r = requests.get(url, auth=BearerAuth(self.token), timeout=self.timeout) | |
| #Debug | |
| print("[DEBUG] get_video response:", r) | |
| return r | |
| def download_video_thumbnail(self, video_id) -> str: | |
| """# Download video thumbnail from iwara.tv | |
| """ | |
| video = self.get_video(video_id).json() | |
| file_id = video['file']['id'] | |
| thumbnail_id = video['thumbnail'] | |
| url = self.file_url + '/image/original/' + file_id + '/thumbnail-{:02d}.jpg'.format(thumbnail_id) | |
| thumbnail_file_name = video_id + '.jpg' | |
| if (os.path.exists(thumbnail_file_name)): | |
| print(f"Video ID {video_id} thumbnail already downloaded, skipped downloading. ") | |
| return thumbnail_file_name | |
| print(f"Downloading thumbnail for video ID: {video_id} ...") | |
| with open(thumbnail_file_name, "wb") as f: | |
| for chunk in requests.get(url).iter_content(chunk_size=1024): | |
| if chunk: | |
| f.write(chunk) | |
| f.flush() | |
| return thumbnail_file_name | |
| def download_video(self, video_id) -> str: | |
| """# Download video from iwara.tv | |
| """ | |
| # html | |
| # url = self.html_url + '/video/' + video_id | |
| # Cloudscraer | |
| # html = self.scraper.get(url, auth=BearerAuth(self.token), timeout=self.timeout).text | |
| # Requests-html | |
| # html = self.session.get(url, auth=BearerAuth(self.token), timeout=self.timeout).text | |
| # print(html) | |
| # html = BeautifulSoup(, 'html.parser') | |
| # downloadLink = html.find('div', class_='dropdown_content') | |
| # print(downloadLink) | |
| # API | |
| try: | |
| video = self.get_video(video_id).json() | |
| except Exception as e: | |
| raise Exception(f"Failed to get video info for video ID: {video_id}, error: {e}") | |
| #Debug | |
| print(video) | |
| url = video['fileUrl'] | |
| file_id = video['file']['id'] | |
| expires = url.split('/')[4].split('?')[1].split('&')[0].split('=')[1] | |
| # IMPORTANT: This might change in the future. | |
| SHA_postfix = "_5nFp9kmbNnHdAFhaqMvt" | |
| SHA_key = file_id + "_" + expires + SHA_postfix | |
| hash = hashlib.sha1(SHA_key.encode('utf-8')).hexdigest() | |
| headers = {"X-Version": hash} | |
| resources = requests.get(url, headers=headers, auth=BearerAuth(self.token), timeout=self.timeout).json() | |
| #Debug | |
| print(resources) | |
| resources_by_quality = [None for i in range(10)] | |
| for resource in resources: | |
| if resource['name'] == 'Source': | |
| resources_by_quality[0] = resource | |
| # elif resource['name'] == '1080': | |
| # resources_by_quality[1] = resource | |
| # elif resource['name'] == '720': | |
| # resources_by_quality[2] = resource | |
| # elif resource['name'] == '480': | |
| # resources_by_quality[3] = resource | |
| # elif resource['name'] == '540': | |
| # resources_by_quality[4] = resource | |
| # elif resource['name'] == '360': | |
| # resources_by_quality[5] = resource | |
| for resource in resources_by_quality: | |
| if resource is not None: | |
| #Debug | |
| print(resource) | |
| download_link = "https:" + resource['src']['download'] | |
| file_type = resource['type'].split('/')[1] | |
| video_file_name = video_id + '.' + file_type | |
| if (os.path.exists(video_file_name)): | |
| print(f"Video ID {video_id} Already downloaded, skipped downloading. ") | |
| return video_file_name | |
| print(f"Downloading video ID: {video_id} ...") | |
| try: | |
| with open(video_file_name, "wb") as f: | |
| for chunk in requests.get(download_link).iter_content(chunk_size=1024): | |
| if chunk: | |
| f.write(chunk) | |
| f.flush() | |
| return video_file_name | |
| except Exception as e: | |
| os.remove(video_file_name) | |
| raise Exception(f"Failed to download video ID: {video_id}, error: {e}") | |
| raise Exception("No video with Source quality found") | |
| # ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| # ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| ### download video from iwara.tv | |
| ### usage: python iwara [url] | |
| ### by AngelBottomless @ github | |
| # download from iwara page | |
| import requests | |
| # use selenium to get video url | |
| from selenium import webdriver | |
| import argparse | |
| def download_video(url): | |
| # save video to local | |
| filename = url.split('/')[-1] + '.mp4' | |
| # get video | |
| driver = run_webdriver(url) | |
| click_accept(driver) | |
| driver.implicitly_wait(2) | |
| click_play(driver) | |
| url = find_video_url(driver) | |
| # download video | |
| r = requests.get(url) | |
| with open(filename, 'wb') as f: | |
| f.write(r.content) | |
| # close driver | |
| driver.close() | |
| def download_with_retry(url, retry=3): | |
| # retry download | |
| for _ in range(retry): | |
| try: | |
| download_video(url) | |
| return True | |
| except: | |
| print('download failed, retrying...') | |
| continue | |
| return False | |
| def run_webdriver(url): | |
| # use selenium to get video url | |
| # mute chrome | |
| chrome_options = webdriver.ChromeOptions() | |
| chrome_options.add_argument("--mute-audio") | |
| # run webdriver | |
| driver = webdriver.Chrome(options=chrome_options) | |
| driver.get(url) | |
| driver.implicitly_wait(4) | |
| return driver | |
| def click_accept(driver): | |
| # xpath = /html/body/div[3]/div/div[2]/button[1] | |
| button = driver.find_element('xpath', '/html/body/div[3]/div/div[2]/button[1]') | |
| button.click() | |
| def click_play(driver): | |
| # xpath = //*[@id="vjs_video_3"]/button | |
| button = driver.find_element('xpath', '//*[@id="vjs_video_3"]/button') | |
| button.click() | |
| def find_video_url(driver): | |
| # xpath //*[@id="vjs_video_3_html5_api"] | |
| #access 'src' | |
| video = driver.find_element('xpath', '//*[@id="vjs_video_3_html5_api"]') | |
| video_url = video.get_attribute('src') | |
| return video_url | |
| def track_clipboard(): | |
| import pyperclip | |
| import time | |
| import subprocess | |
| failed_urls = [] | |
| success_urls = set() | |
| print('tracking clipboard...') | |
| # loop to track clipboard | |
| # if clipboard contains url, download video | |
| # track every 1 second | |
| previous = '' | |
| # expect KeyboardInterrupt and return 0 | |
| try: | |
| while True: | |
| # get clipboard | |
| clipboard = pyperclip.paste() | |
| if clipboard != previous: | |
| # if clipboard contains url | |
| if 'iwara.tv' in clipboard: | |
| print('url detected, downloading...') | |
| # use subprocess to download video in background | |
| # ['python', '-m', 'iwara', clipboard] | |
| subprocess.Popen(['python', '-m', 'iwara', clipboard]) | |
| print('download complete') | |
| previous = clipboard | |
| time.sleep(1) | |
| except KeyboardInterrupt: | |
| print('exiting...') | |
| return 0 | |
| if __name__ == '__main__': | |
| failed_urls = [] | |
| success_urls = set() | |
| import sys | |
| # parse args | |
| parser = argparse.ArgumentParser() | |
| # track clipboard option, when 'track' is used, url is not required | |
| parser.add_argument('-t', '--track', action='store_true', help='track clipboard for iwara url') | |
| # add url argument, if not specified, use '' | |
| parser.add_argument('url', nargs='?', default='', help='iwara url') | |
| args = parser.parse_args() | |
| # download video | |
| if args.track: | |
| track_clipboard() | |
| elif 'iwara.tv' in args.url: | |
| result = download_with_retry(args.url) | |
| if not result: | |
| print('download failed') | |
| failed_urls.append(args.url) | |
| else: | |
| print('download complete') | |
| success_urls.add(args.url) | |
| if len(failed_urls) > 0: | |
| print('failed urls:') | |
| for url in failed_urls: | |
| print(url) | |
| # write in ./failed.txt | |
| with open('failed.txt', 'a') as f: | |
| f.write(url + '\n') | |
| sys.exit(1) | |
| else: | |
| print('invalid url') | |
| sys.exit(1) | |
| # ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
| def iwara(video_url, judul): | |
| # Set the path to the thumbnail directory | |
| directory = "/home/user/app/Iwara" | |
| if not os.path.exists(directory): | |
| os.makedirs(directory) | |
| judul = judul.replace('_',' ').title().replace('Mmd','MMD').replace('/',' ').replace('Nikke','NIKKE').replace('Fate','FATE').replace('】','】 ').replace(' ', ' ') | |
| thumbnail_url = 'https://saradahentai.com/wp-content/uploads/2023/03/Live-Footage-of-Ashley-Graham-Captured-by-fugtrup-Resident-Evil-4.jpg' | |
| thumbnail_file = download_file(thumbnail_url, judul, directory) | |
| video_file = download_file(video_url, judul, directory) | |
| # Mengkonversi video | |
| video_file = convert_videos(720, video_file) | |
| video_info = f"Judul: {judul}\n" | |
| return video_file, judul, video_info, thumbnail_file |