Spaces:
Build error
Build error
| # -*- coding: utf-8 -*- | |
| import os | |
| import shutil | |
| import re | |
| import requests | |
| import json | |
| from lxml import html | |
| import traceback | |
| import datetime | |
| import time | |
| from urllib.parse import urlparse | |
| import html as un_html | |
| from selenium.common.exceptions import WebDriverException, NoSuchElementException, JavascriptException | |
| from lxml.html.clean import Cleaner | |
| from modules import g_config, logger | |
| from modules import untils | |
| from modules.browsers import FireFoxBrowser, ChromeUndetectedBrowser | |
| from modules.websites import TruyenFull, system_status | |
| from modules.manga_boto3 import MangaBoto3 | |
| cleaner = Cleaner() | |
| cleaner.javascript = True | |
| cleaner.style = False | |
| cleaner.embedded = False | |
| class Manga(): | |
| def __init__(self): | |
| self.slug = None | |
| self.avatar = None | |
| self.episodes = [] | |
| self.name = "" | |
| self.name_alternative = "" | |
| self.author_name = None | |
| self.status = None | |
| self.tags = [] | |
| self.like_number = 0 | |
| self.view_number = 0 | |
| self.follow_number = 0 | |
| self.description = None | |
| def download_images(self, episode, cookies, domain="https://truyenqqpro.com/"): | |
| manga_boto3 = None | |
| if g_config.TYPE_LINK_IMAGE != "local": | |
| manga_boto3 = MangaBoto3() | |
| slug_episode = self.slug | |
| path_dir = os.path.join(os.getcwd(), f"resources/{slug_episode}/{episode['name']}") | |
| os.makedirs(path_dir, exist_ok=True) | |
| for image in episode['images']: | |
| parse = urlparse(image) | |
| path = f"{path_dir}/{parse.path.split('/')[-1]}" | |
| untils.download_file_img(image, path, cookies, domain) | |
| if g_config.TYPE_LINK_IMAGE != "local": | |
| chapter_number = episode['name'].split(" ")[-1] | |
| file_name = parse.path.split('/')[-1] | |
| manga_boto3.put_resource(path, f"{slug_episode}/{chapter_number}/{file_name}") | |
| def download_thumb(self, cookies, domain=""): | |
| path_dir = os.path.join(os.getcwd(), f"resources/{self.slug}") | |
| os.makedirs(path_dir, exist_ok=True) | |
| path = f"{path_dir}/thumb.jpg" | |
| untils.download_file_img(self.avatar, path, cookies, domain) | |
| class Sources: | |
| def __init__(self): | |
| pass | |
| class Truyenqqpro(Sources): | |
| def __init__(self): | |
| super().__init__() | |
| self.temp_domains = ["truyenqqvip.com", "truyenqqhot.com"] | |
| self.domain = "https://truyenqqmoi.com" | |
| self.is_use_selenium = g_config.USE_SELENIUM | |
| self.chapter_type = "image" | |
| def selenium_get_list_manga_in_category(self, url=None): | |
| url_main = url | |
| stt_page = 1 | |
| retry = 0 | |
| max_retry = 5 | |
| proxy = "" | |
| driver = None | |
| brows = None | |
| while retry < max_retry: | |
| brows = ChromeUndetectedBrowser("", proxy) | |
| try: | |
| brows.init_driver() | |
| driver = brows.get_driver() | |
| break | |
| except KeyboardInterrupt: | |
| if brows is not None: | |
| brows.stop() | |
| raise KeyboardInterrupt | |
| except: | |
| logger.error("Truyenqqpro - Error selenium get") | |
| logger.error(traceback.format_exc()) | |
| retry = retry + 1 | |
| time.sleep(3) | |
| continue | |
| if driver is None: | |
| return | |
| while True: | |
| url = f"{url_main}/trang-{stt_page}.html" | |
| item_website = None | |
| try: | |
| retry = 0 | |
| max_retry = 5 | |
| content = None | |
| while retry < max_retry: | |
| try: | |
| driver.get(url) | |
| time.sleep(2) | |
| content = driver.page_source | |
| break | |
| except KeyboardInterrupt: | |
| if brows is not None: | |
| brows.stop() | |
| raise KeyboardInterrupt | |
| except: | |
| logger.error(f"Truyenqqpro - Error selenium get - Url: {url}") | |
| logger.error(traceback.format_exc()) | |
| retry = retry + 1 | |
| time.sleep(3) | |
| if content is None: | |
| logger.error("=" * 20) | |
| logger.error(f"Truyenqqpro - selenium_get_list_manga_in_category - content none - page {stt_page}") | |
| continue | |
| main_div = html.fromstring(content) | |
| list_item = main_div.xpath("//div[@class='list_grid_out']/ul/li") | |
| if len(list_item) == 0: | |
| break | |
| # print(f"total manga: {len(list_item)}") | |
| for item in list_item: | |
| href = None | |
| href_ele = item.xpath("div[1]/a") | |
| if len(href_ele) > 0: | |
| href = href_ele[0].get("href") | |
| if href is None: | |
| continue | |
| item_website = self.selenium_get_episodes(href) | |
| untils.remove_folder_manga(item_website) | |
| except KeyboardInterrupt: | |
| if brows is not None: | |
| brows.stop() | |
| if item_website is not None: | |
| untils.remove_folder_manga(item_website) | |
| raise KeyboardInterrupt | |
| except: | |
| logger.error("=" * 20) | |
| logger.error("Truyenqqpro - Error selenium_get_list_manga_in_category") | |
| logger.error(traceback.format_exc()) | |
| continue | |
| stt_page = stt_page + 1 | |
| if brows is not None: | |
| brows.stop() | |
| def selenium_get_episodes(self, url=None, brows=None): | |
| item_manga = Manga() | |
| item_website = None | |
| results = [] | |
| cookies = "" | |
| try: | |
| retry = 0 | |
| max_retry = 5 | |
| content = None | |
| proxy = "" | |
| while retry < max_retry: | |
| try: | |
| driver = brows.get_driver() | |
| driver.get(url) | |
| content = driver.page_source | |
| cookies = driver.get_cookies() | |
| break | |
| except KeyboardInterrupt: | |
| raise KeyboardInterrupt | |
| except: | |
| logger.error("Truyenqqpro - Error selenium get") | |
| logger.error(traceback.format_exc()) | |
| retry = retry + 1 | |
| time.sleep(3) | |
| if content is None: | |
| logger.error("=" * 20) | |
| logger.error("Truyenqqpro - selenium_get_episode_detail - content none") | |
| return results | |
| main_div = html.fromstring(content) | |
| cookies = untils.format_to_sure_cookies(cookies) | |
| list_item = main_div.xpath("//div[@class='list_chapter']/div/div") | |
| # print(f"Total chapter: {len(list_item)}") | |
| item_manga = self.get_info_manga(item_manga, main_div) | |
| item_website = untils.get_item_website(item_manga) | |
| # Download thumbnail | |
| item_manga.download_thumb(cookies) | |
| list_chapter_uploaded = item_website.get_list_chapter() | |
| self.get_all_chapters(brows, list_item, item_manga, cookies, list_chapter_uploaded) | |
| logger.info(f"Manga {item_manga.name}") | |
| logger.info(f"Total episodes - {len(item_manga.episodes)}") | |
| if system_status["error"]: | |
| return item_website | |
| for episode in item_manga.episodes: | |
| if self.chapter_type == "image": | |
| logger.info(f"Total images - {len(episode['images'])}") | |
| # item_manga.download_images(episode, cookies) | |
| if g_config.TYPE_LINK_IMAGE == 'local': | |
| untils.compress_a_dir(os.path.join(os.getcwd(), f"resources/{item_manga.slug}/{episode['name']}")) | |
| untils.update_data_to_website(item_website, episode, self.chapter_type) | |
| except KeyboardInterrupt: | |
| raise KeyboardInterrupt | |
| except: | |
| logger.error("=" * 20) | |
| logger.error("Truyenqqpro - Error selenium_get_episodes") | |
| logger.error(traceback.format_exc()) | |
| return item_website | |
| def get_episode_detail(self, brows, item_manga, url=None, name=None, cookies=None): | |
| try: | |
| retry = 0 | |
| max_retry = 5 | |
| content = None | |
| while retry < max_retry: | |
| try: | |
| driver = brows.get_driver() | |
| driver.get(url) | |
| content = driver.page_source | |
| break | |
| except KeyboardInterrupt: | |
| raise KeyboardInterrupt | |
| except: | |
| logger.error("Truyenqqpro - Error selenium get") | |
| logger.error(traceback.format_exc()) | |
| retry = retry + 1 | |
| time.sleep(3) | |
| main_div = html.fromstring(content) | |
| list_item = main_div.xpath("//div[@class='chapter_content']/div[2]/div") | |
| images = [] | |
| if len(list_item) == 0: | |
| # print(content) | |
| logger.error(f"List item empty") | |
| logger.error(content) | |
| return False | |
| for item in list_item: | |
| """Skip 2 img first""" | |
| img_ele = item.xpath("img") | |
| if len(img_ele) == 0: | |
| continue | |
| src = img_ele[0].get("data-original", None) | |
| if src is None: | |
| continue | |
| images.append(src) | |
| """Skip last img""" | |
| images = images[:-1] | |
| # print(f"images: {len(images)}") | |
| item_manga.episodes.append({"name": name, "images": images}) | |
| time.sleep(2) | |
| except KeyboardInterrupt: | |
| if brows is not None: | |
| brows.stop() | |
| raise KeyboardInterrupt | |
| except: | |
| logger.error("=" * 20) | |
| logger.error("Truyenqqpro - Error get_episode_detail") | |
| logger.error(traceback.format_exc()) | |
| return True | |
| def get_info_manga(self, item_manga, main_div): | |
| avatar = "" | |
| name_episode = "" | |
| name_alternative = "" | |
| tags = [] | |
| author_name = "Đang Cập Nhật" | |
| status = "" | |
| like_number = 0 | |
| view_number = 0 | |
| follow_number = 0 | |
| description = "" | |
| slug = "" | |
| name_episode_ele = main_div.xpath("//h1[@itemprop='name']") | |
| if len(name_episode_ele) > 0: | |
| name_episode = name_episode_ele[0].text.strip() | |
| info_ele = main_div.xpath("//div[@class='book_info']/div[2]/div[1]/ul") | |
| if len(info_ele) > 0: | |
| name_alternative_ele = info_ele[0].xpath("li[@class='othername row']/h2") | |
| if len(name_alternative_ele) > 0: | |
| name_alternative = name_alternative_ele[0].text.strip() | |
| author_ele = info_ele[0].xpath("li[@class='author row']/p[2]/a") | |
| if len(author_ele) > 0: | |
| author_name = author_ele[0].text.strip() | |
| status_ele = info_ele[0].xpath("li[@class='status row']/p[2]") | |
| if len(status_ele) > 0: | |
| status = status_ele[0].text.strip() | |
| like_ele = info_ele[0].xpath("li[4]/p[2]") | |
| if len(like_ele) > 0: | |
| like_number = like_ele[0].text.strip() | |
| follow_ele = info_ele[0].xpath("li[5]/p[2]") | |
| if len(follow_ele) > 0: | |
| follow_number = follow_ele[0].text.strip() | |
| view_ele = info_ele[0].xpath("li[6]/p[2]") | |
| if len(view_ele) > 0: | |
| view_number = view_ele[0].text.strip() | |
| info_tag_ele = main_div.xpath("//div[@class='book_info']/div[2]/ul[1]/li") | |
| if len(info_tag_ele) > 0: | |
| for item in info_tag_ele: | |
| tag = item.xpath("a")[0].text.strip() | |
| tags.append(tag) | |
| description_ele = main_div.xpath("//div[@class='book_detail']/div[2]/p") | |
| if len(description_ele) > 0: | |
| for des in description_ele: | |
| description = f"{description}{des.text}\n" | |
| description = description.strip() | |
| if len(description) == 0: | |
| description = f"Truyện tranh {name_episode} được cập nhật nhanh và đầy đủ nhất tại TruyenFull. " \ | |
| f"Bạn đọc đừng quên để lại bình luận và chia sẻ, ủng hộ TruyenFull ra các chương mới " \ | |
| f"nhất của truyện {name_episode}." | |
| avatar_ele = main_div.xpath("//div[@class='book_avatar']/img") | |
| if len(avatar_ele) > 0: | |
| avatar = avatar_ele[0].get("src") | |
| slug_ele = main_div.xpath("//input[@id='slug']") | |
| if len(slug_ele) > 0: | |
| slug = slug_ele[0].get("value") | |
| item_manga.slug = slug | |
| item_manga.avatar = avatar | |
| item_manga.name = name_episode | |
| item_manga.name_alternative = name_alternative | |
| item_manga.author_name = author_name | |
| item_manga.status = status | |
| item_manga.like_number = int(str(like_number).replace(",", "")) | |
| item_manga.follow_number = int(str(follow_number).replace(",", "")) | |
| item_manga.view_number = int(str(view_number).replace(",", "")) | |
| item_manga.tags = tags | |
| item_manga.description = description | |
| return item_manga | |
| def selenium_get_list_manga_newest(self): | |
| url_main = f"{self.domain}/truyen-moi-cap-nhat.html" | |
| stt_page = 1 | |
| retry = 0 | |
| max_retry = 5 | |
| proxy = "" | |
| driver = None | |
| brows = None | |
| while retry < max_retry: | |
| brows = ChromeUndetectedBrowser("", proxy) | |
| try: | |
| brows.init_driver() | |
| driver = brows.get_driver() | |
| break | |
| except KeyboardInterrupt: | |
| if brows is not None: | |
| brows.stop() | |
| raise KeyboardInterrupt | |
| except: | |
| logger.error("Truyenqqpro - Error selenium get") | |
| logger.error(traceback.format_exc()) | |
| retry = retry + 1 | |
| time.sleep(3) | |
| continue | |
| if driver is None: | |
| return | |
| while True: | |
| url = f"{url_main}/trang-{stt_page}.html" | |
| item_website = None | |
| try: | |
| retry = 0 | |
| max_retry = 5 | |
| content = None | |
| while retry < max_retry: | |
| try: | |
| driver.get(url) | |
| time.sleep(2) | |
| content = driver.page_source | |
| break | |
| except KeyboardInterrupt: | |
| raise KeyboardInterrupt | |
| except: | |
| logger.error(f"Truyenqqpro - Error selenium get - Url: {url}") | |
| logger.error(traceback.format_exc()) | |
| retry = retry + 1 | |
| time.sleep(3) | |
| if content is None: | |
| logger.error("=" * 20) | |
| logger.error(f"Truyenqqpro - selenium_get_list_manga_newest - content none - page {stt_page}") | |
| continue | |
| main_div = html.fromstring(content) | |
| list_item = main_div.xpath("//div[@class='list_grid_out']/ul/li") | |
| if len(list_item) == 0: | |
| break | |
| # print(f"total manga: {len(list_item)}") | |
| for item in list_item: | |
| href = None | |
| href_ele = item.xpath("div[1]/a") | |
| if len(href_ele) > 0: | |
| href = href_ele[0].get("href") | |
| if href is None: | |
| continue | |
| item_website = self.selenium_get_episodes(href, brows) | |
| untils.remove_folder_manga(item_website) | |
| self.selenium_upload_custom_task() | |
| except KeyboardInterrupt: | |
| if brows is not None: | |
| brows.stop() | |
| if item_website is not None: | |
| untils.remove_folder_manga(item_website) | |
| untils.close_all_chrome_browsers() | |
| raise KeyboardInterrupt | |
| except: | |
| logger.error("=" * 20) | |
| logger.error("Truyenqqpro - Error selenium_get_list_manga_newest") | |
| logger.error(traceback.format_exc()) | |
| continue | |
| stt_page = stt_page + 1 | |
| if brows is not None: | |
| brows.stop() | |
| untils.close_all_chrome_browsers() | |
| def get_all_chapters(self, brows, list_item, item_manga, cookies, list_chapter_uploaded): | |
| limit_chapter = g_config.MAX_NUM_CHAPTER | |
| stt = 0 | |
| time_try = 10 | |
| for item in list_item: | |
| href = None | |
| name = None | |
| stt = stt + 1 | |
| if stt > limit_chapter: | |
| break | |
| href_ele = item.xpath("div[1]/a") | |
| if len(href_ele) > 0: | |
| name = href_ele[0].text.strip() | |
| href = href_ele[0].get("href") | |
| if name is None or href is None: | |
| continue | |
| if name in list_chapter_uploaded: | |
| continue | |
| # print(f"Get details - {name} - href - {href}") | |
| stt_try = 0 | |
| while True: | |
| check = self.get_episode_detail(brows, item_manga, href, name, cookies) | |
| if check is False and stt_try > time_try: | |
| system_status["error"] = True | |
| system_status["message"] = "Không thể bật trình duyệt - đang tiến hành thử lại" | |
| return | |
| if check or stt_try > time_try: | |
| time.sleep(2) | |
| break | |
| stt_try += 1 | |
| time.sleep(2) | |
| def selenium_upload_custom_task(self): | |
| truyenfull = TruyenFull(None, logger) | |
| list_task_customs = truyenfull.get_list_task_customs(self.temp_domains) | |
| if len(list_task_customs) == 0: | |
| return | |
| for item in list_task_customs: | |
| logger.info(f"Start task upload - {item['link_manga']}") | |
| item_website = self.selenium_get_episodes(item['link_manga']) | |
| untils.remove_folder_manga(item_website) | |
| logger.info(f"Done task upload - {item['link_manga']}") | |
| truyenfull.update_task_custom({"id": item['id'], "status": 2}) | |