Sentiment / services /tiktok.py
NzTama's picture
Initial clean deploy: Sentiment Analysis
fa8ff66
"""
tiktok.py – TikTok scraper using Selenium.
Exports: scrape_tiktok(cookie_str, target_username) -> list[dict]
Returns structured data per-video:
url, profile_username, upload_date, like_count,
caption_short, caption_detail, comments, scrape_date
cookie_str accepts:
1. Raw string: "sessionid=xxx; tt_webid=yyy; ..."
2. JSON array: [{"name":"sessionid","value":"xxx",...}, ...]
3. JSON object: {"sessionid": "xxx", "tt_webid": "yyy"}
"""
from __future__ import annotations
import json
import time
from datetime import datetime
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from ._driver import _create_driver
# ── Cookie injection ───────────────────────────────────────────────────────────
def _inject_cookies(driver, cookie_str: str) -> bool:
driver.get("https://www.tiktok.com/")
time.sleep(3)
if not cookie_str or not cookie_str.strip():
print("[TikTok] Tidak ada cookie yang diberikan.")
return False
stripped = cookie_str.strip()
if stripped.startswith("["):
try:
cookies = json.loads(stripped)
count = 0
for c in cookies:
if not isinstance(c, dict) or "name" not in c:
continue
safe = {k: c[k] for k in ("name", "value", "domain", "path", "secure", "httpOnly", "expiry") if k in c}
safe.setdefault("domain", ".tiktok.com")
try:
driver.add_cookie(safe)
count += 1
except Exception:
safe.pop("domain", None)
try:
driver.add_cookie(safe)
count += 1
except Exception:
pass
driver.refresh()
time.sleep(3)
return count > 0
except Exception as e:
print(f"[TikTok] JSON array error: {e}")
if stripped.startswith("{"):
try:
obj = json.loads(stripped)
count = 0
for name, value in obj.items():
try:
driver.add_cookie({"name": str(name), "value": str(value), "domain": ".tiktok.com"})
count += 1
except Exception:
pass
driver.refresh()
time.sleep(3)
return count > 0
except Exception as e:
print(f"[TikTok] JSON object error: {e}")
try:
count = 0
for item in stripped.split(";"):
item = item.strip()
if "=" not in item:
continue
name, _, value = item.partition("=")
try:
driver.add_cookie({"name": name.strip(), "value": value.strip(), "domain": ".tiktok.com"})
count += 1
except Exception:
pass
driver.refresh()
time.sleep(3)
return count > 0
except Exception as e:
print(f"[TikTok] String cookie error: {e}")
return False
# ── Scraping helpers ───────────────────────────────────────────────────────────
_VIDEO_LINK_SELECTORS = [
'div[data-e2e="user-post-item"] a',
'div[data-e2e="user-post-item-list"] a',
'a[href*="/video/"]',
'div[class*="DivItemContainerV2"] a',
'div[class*="DivWrapper"] a[href*="/video/"]',
]
def _get_video_links(driver, profile_url: str, max_videos: int = 30) -> list:
print(f"[TikTok] Membuka profil: {profile_url}")
driver.get(profile_url)
loaded = False
for sel in _VIDEO_LINK_SELECTORS:
try:
WebDriverWait(driver, 15).until(EC.presence_of_element_located((By.CSS_SELECTOR, sel)))
loaded = True
break
except TimeoutException:
continue
if not loaded:
time.sleep(5)
links: set = set()
stall = 0
while len(links) < max_videos:
prev = len(links)
for sel in _VIDEO_LINK_SELECTORS:
for el in driver.find_elements(By.CSS_SELECTOR, sel):
href = el.get_attribute("href")
if href and "/video/" in href:
links.add(href.split("?")[0])
if len(links) >= max_videos:
break
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(3)
if len(links) == prev:
stall += 1
if stall >= 3:
break
else:
stall = 0
return list(links)[:max_videos]
def _scrape_video(driver, video_url: str, profile_username: str) -> dict | None:
print(f"[TikTok] Memproses: {video_url}")
driver.get(video_url)
time.sleep(5)
video_data = {
"url": video_url,
"profile_username": profile_username,
"upload_date": "N/A",
"like_count": "N/A",
"caption_short": "",
"caption_detail": "",
"comments": [],
"scrape_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
try:
date_el = WebDriverWait(driver, 8).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'span[data-e2e="browser-video-meta-date"]'))
)
video_data["upload_date"] = date_el.text.strip()
except TimeoutException:
pass
try:
like_el = driver.find_element(By.CSS_SELECTOR, 'strong[data-e2e="like-count"]')
video_data["like_count"] = like_el.text.strip()
except NoSuchElementException:
pass
try:
desc_container = WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-e2e='browse-video-desc']"))
)
try:
cap_el = desc_container.find_element(By.CSS_SELECTOR, 'span[data-e2e="new-desc-span"]')
video_data["caption_short"] = cap_el.text.strip()
try:
more_btn = driver.find_element(By.CSS_SELECTOR, "span[class*='-SpanExpandIcon']")
driver.execute_script("arguments[0].click();", more_btn)
time.sleep(2)
detail_container = WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div[class*='DivCustomTDKContainer']"))
)
desc_text = ""
try:
desc_text = detail_container.find_element(By.CSS_SELECTOR, "div[data-e2e='v2t-desc']").text
except NoSuchElementException:
pass
kw_text = ""
try:
kw_text = detail_container.find_element(By.CSS_SELECTOR, "div[data-e2e='v2t-keywords']").text
except NoSuchElementException:
pass
video_data["caption_detail"] = f"Deskripsi: {desc_text}\nKeywords: {kw_text}".strip()
except Exception:
pass
except NoSuchElementException:
pass
except TimeoutException:
pass
try:
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div[class*='DivCommentListContainer']"))
)
reply_xpath = "//span[contains(text(), 'balasan') or (contains(text(), 'View') and contains(text(), 'repl'))]"
stall = 0
last_count = 0
for _ in range(15):
try:
btns = driver.find_elements(By.XPATH, reply_xpath)
if btns:
driver.execute_script("arguments[0].click();", btns[0])
time.sleep(2)
stall = 0
continue
except Exception:
pass
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(3)
cur = len(driver.find_elements(By.XPATH, '//div[contains(@class, "DivCommentItemWrapper")]'))
if cur > last_count:
last_count = cur
stall = 0
else:
stall += 1
if stall >= 4:
break
items = driver.find_elements(By.XPATH, '//div[contains(@class, "DivCommentItemWrapper")]')
for item in items:
try:
author_el = item.find_elements(By.XPATH, './/div[@data-e2e="comment-username-1"]//p')
if author_el:
cat_text = item.find_element(By.XPATH, './/span[@data-e2e="comment-level-1"]').text.strip()
if cat_text:
video_data["comments"].append({
"author": author_el[0].text.strip(),
"comment": cat_text,
"replies": []
})
continue
# Check for replies (level 2)
r_author_el = item.find_elements(By.XPATH, './/div[@data-e2e="comment-username-2"]//p')
if r_author_el and video_data["comments"]:
r_text = item.find_element(By.XPATH, './/span[@data-e2e="comment-level-2"]').text.strip()
if r_text:
video_data["comments"][-1]["replies"].append({
"author": r_author_el[0].text.strip(),
"comment": r_text
})
except Exception:
pass
except TimeoutException:
pass
return video_data
# ── Public API ─────────────────────────────────────────────────────────────────
def scrape_tiktok(cookie_str: str, target_username: str, max_videos: int = 20) -> list:
"""
Scrape captions & comments from a TikTok profile.
Returns:
list of dicts with: url, profile_username, upload_date, like_count,
caption_short, caption_detail, comments, scrape_date
"""
if not target_username:
print("[TikTok] target_username tidak ada.")
return []
username = target_username.lstrip("@")
profile_url = f"https://www.tiktok.com/@{username}"
driver = _create_driver(mobile=False)
all_data: list = []
try:
if cookie_str and cookie_str.strip():
_inject_cookies(driver, cookie_str)
links = _get_video_links(driver, profile_url, max_videos)
for url in links:
try:
data = _scrape_video(driver, url, username)
if data:
all_data.append(data)
except Exception as e:
print(f"[TikTok] Error {url}: {e}")
time.sleep(1.5)
except Exception as e:
print(f"[TikTok] Fatal error: {e}")
finally:
try:
driver.quit()
except Exception:
pass
return all_data