""" tiktok.py – TikTok scraper using Selenium. Exports: scrape_tiktok(cookie_str, target_username) -> list[dict] Returns structured data per-video: url, profile_username, upload_date, like_count, caption_short, caption_detail, comments, scrape_date cookie_str accepts: 1. Raw string: "sessionid=xxx; tt_webid=yyy; ..." 2. JSON array: [{"name":"sessionid","value":"xxx",...}, ...] 3. JSON object: {"sessionid": "xxx", "tt_webid": "yyy"} """ from __future__ import annotations import json import time from datetime import datetime from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from ._driver import _create_driver # ── Cookie injection ─────────────────────────────────────────────────────────── def _inject_cookies(driver, cookie_str: str) -> bool: driver.get("https://www.tiktok.com/") time.sleep(3) if not cookie_str or not cookie_str.strip(): print("[TikTok] Tidak ada cookie yang diberikan.") return False stripped = cookie_str.strip() if stripped.startswith("["): try: cookies = json.loads(stripped) count = 0 for c in cookies: if not isinstance(c, dict) or "name" not in c: continue safe = {k: c[k] for k in ("name", "value", "domain", "path", "secure", "httpOnly", "expiry") if k in c} safe.setdefault("domain", ".tiktok.com") try: driver.add_cookie(safe) count += 1 except Exception: safe.pop("domain", None) try: driver.add_cookie(safe) count += 1 except Exception: pass driver.refresh() time.sleep(3) return count > 0 except Exception as e: print(f"[TikTok] JSON array error: {e}") if stripped.startswith("{"): try: obj = json.loads(stripped) count = 0 for name, value in obj.items(): try: driver.add_cookie({"name": str(name), "value": str(value), "domain": ".tiktok.com"}) count += 1 except Exception: pass driver.refresh() time.sleep(3) return count > 0 except Exception as e: print(f"[TikTok] JSON object error: {e}") try: count = 0 for item in stripped.split(";"): item = item.strip() if "=" not in item: continue name, _, value = item.partition("=") try: driver.add_cookie({"name": name.strip(), "value": value.strip(), "domain": ".tiktok.com"}) count += 1 except Exception: pass driver.refresh() time.sleep(3) return count > 0 except Exception as e: print(f"[TikTok] String cookie error: {e}") return False # ── Scraping helpers ─────────────────────────────────────────────────────────── _VIDEO_LINK_SELECTORS = [ 'div[data-e2e="user-post-item"] a', 'div[data-e2e="user-post-item-list"] a', 'a[href*="/video/"]', 'div[class*="DivItemContainerV2"] a', 'div[class*="DivWrapper"] a[href*="/video/"]', ] def _get_video_links(driver, profile_url: str, max_videos: int = 30) -> list: print(f"[TikTok] Membuka profil: {profile_url}") driver.get(profile_url) loaded = False for sel in _VIDEO_LINK_SELECTORS: try: WebDriverWait(driver, 15).until(EC.presence_of_element_located((By.CSS_SELECTOR, sel))) loaded = True break except TimeoutException: continue if not loaded: time.sleep(5) links: set = set() stall = 0 while len(links) < max_videos: prev = len(links) for sel in _VIDEO_LINK_SELECTORS: for el in driver.find_elements(By.CSS_SELECTOR, sel): href = el.get_attribute("href") if href and "/video/" in href: links.add(href.split("?")[0]) if len(links) >= max_videos: break driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(3) if len(links) == prev: stall += 1 if stall >= 3: break else: stall = 0 return list(links)[:max_videos] def _scrape_video(driver, video_url: str, profile_username: str) -> dict | None: print(f"[TikTok] Memproses: {video_url}") driver.get(video_url) time.sleep(5) video_data = { "url": video_url, "profile_username": profile_username, "upload_date": "N/A", "like_count": "N/A", "caption_short": "", "caption_detail": "", "comments": [], "scrape_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } try: date_el = WebDriverWait(driver, 8).until( EC.presence_of_element_located((By.CSS_SELECTOR, 'span[data-e2e="browser-video-meta-date"]')) ) video_data["upload_date"] = date_el.text.strip() except TimeoutException: pass try: like_el = driver.find_element(By.CSS_SELECTOR, 'strong[data-e2e="like-count"]') video_data["like_count"] = like_el.text.strip() except NoSuchElementException: pass try: desc_container = WebDriverWait(driver, 5).until( EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-e2e='browse-video-desc']")) ) try: cap_el = desc_container.find_element(By.CSS_SELECTOR, 'span[data-e2e="new-desc-span"]') video_data["caption_short"] = cap_el.text.strip() try: more_btn = driver.find_element(By.CSS_SELECTOR, "span[class*='-SpanExpandIcon']") driver.execute_script("arguments[0].click();", more_btn) time.sleep(2) detail_container = WebDriverWait(driver, 5).until( EC.presence_of_element_located((By.CSS_SELECTOR, "div[class*='DivCustomTDKContainer']")) ) desc_text = "" try: desc_text = detail_container.find_element(By.CSS_SELECTOR, "div[data-e2e='v2t-desc']").text except NoSuchElementException: pass kw_text = "" try: kw_text = detail_container.find_element(By.CSS_SELECTOR, "div[data-e2e='v2t-keywords']").text except NoSuchElementException: pass video_data["caption_detail"] = f"Deskripsi: {desc_text}\nKeywords: {kw_text}".strip() except Exception: pass except NoSuchElementException: pass except TimeoutException: pass try: WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.CSS_SELECTOR, "div[class*='DivCommentListContainer']")) ) reply_xpath = "//span[contains(text(), 'balasan') or (contains(text(), 'View') and contains(text(), 'repl'))]" stall = 0 last_count = 0 for _ in range(15): try: btns = driver.find_elements(By.XPATH, reply_xpath) if btns: driver.execute_script("arguments[0].click();", btns[0]) time.sleep(2) stall = 0 continue except Exception: pass driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(3) cur = len(driver.find_elements(By.XPATH, '//div[contains(@class, "DivCommentItemWrapper")]')) if cur > last_count: last_count = cur stall = 0 else: stall += 1 if stall >= 4: break items = driver.find_elements(By.XPATH, '//div[contains(@class, "DivCommentItemWrapper")]') for item in items: try: author_el = item.find_elements(By.XPATH, './/div[@data-e2e="comment-username-1"]//p') if author_el: cat_text = item.find_element(By.XPATH, './/span[@data-e2e="comment-level-1"]').text.strip() if cat_text: video_data["comments"].append({ "author": author_el[0].text.strip(), "comment": cat_text, "replies": [] }) continue # Check for replies (level 2) r_author_el = item.find_elements(By.XPATH, './/div[@data-e2e="comment-username-2"]//p') if r_author_el and video_data["comments"]: r_text = item.find_element(By.XPATH, './/span[@data-e2e="comment-level-2"]').text.strip() if r_text: video_data["comments"][-1]["replies"].append({ "author": r_author_el[0].text.strip(), "comment": r_text }) except Exception: pass except TimeoutException: pass return video_data # ── Public API ───────────────────────────────────────────────────────────────── def scrape_tiktok(cookie_str: str, target_username: str, max_videos: int = 20) -> list: """ Scrape captions & comments from a TikTok profile. Returns: list of dicts with: url, profile_username, upload_date, like_count, caption_short, caption_detail, comments, scrape_date """ if not target_username: print("[TikTok] target_username tidak ada.") return [] username = target_username.lstrip("@") profile_url = f"https://www.tiktok.com/@{username}" driver = _create_driver(mobile=False) all_data: list = [] try: if cookie_str and cookie_str.strip(): _inject_cookies(driver, cookie_str) links = _get_video_links(driver, profile_url, max_videos) for url in links: try: data = _scrape_video(driver, url, username) if data: all_data.append(data) except Exception as e: print(f"[TikTok] Error {url}: {e}") time.sleep(1.5) except Exception as e: print(f"[TikTok] Fatal error: {e}") finally: try: driver.quit() except Exception: pass return all_data