Sentiment / services /medos.py
NzTama's picture
Initial clean deploy: Sentiment Analysis
fa8ff66
"""
medos.py – Instagram scraper using Selenium.
Exports: scrape_medos(username, password, target_account, mode) -> list[str]
Strategy:
1. Try saved cookies first (faster, avoids login throttling).
2. Fall back to username/password login via mobile IG version.
3. Collect post links from profile / hashtag page.
4. Scrape caption + visible comments from each post.
"""
from __future__ import annotations
import json
import os
import time
from datetime import datetime, timedelta
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from ._driver import _create_driver
IG_BASE = "https://www.instagram.com/"
# ── Cookie helpers ─────────────────────────────────────────────────────────────
def _save_cookies(driver, path: str) -> None:
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(driver.get_cookies(), f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"[Medos] Gagal simpan cookies: {e}")
def _load_cookies(driver, path: str) -> bool:
if not os.path.exists(path) or os.path.getsize(path) == 0:
return False
try:
with open(path, "r", encoding="utf-8") as f:
cookies = json.load(f)
driver.get(IG_BASE)
time.sleep(2)
driver.delete_all_cookies()
for c in cookies:
allowed = {k: c[k] for k in c.keys() & {"name", "value", "domain", "path", "secure", "httpOnly", "expiry"}}
if "expiry" in allowed and isinstance(allowed["expiry"], float):
allowed["expiry"] = int(allowed["expiry"])
try:
driver.add_cookie(allowed)
except Exception:
allowed.pop("domain", None)
try:
driver.add_cookie(allowed)
except Exception:
pass
return True
except Exception as e:
print(f"[Medos] Gagal load cookies: {e}")
return False
def _is_logged_in(driver) -> bool:
"""Check if the session has a valid sessionid cookie on instagram."""
return any(c.get("name") == "sessionid" for c in driver.get_cookies())
# ── Login ──────────────────────────────────────────────────────────────────────
def _login(driver, username: str, password: str, cookies_file: str) -> bool:
# 1. Try saved cookies
if _load_cookies(driver, cookies_file):
driver.get(IG_BASE)
time.sleep(3)
if _is_logged_in(driver):
print("[Medos] Login via cookies OK.")
return True
print("[Medos] Cookies kadaluarsa, coba login manual.")
# 2. Username/password login
login_url = f"{IG_BASE}accounts/login/"
driver.get(login_url)
print("[Medos] Membuka halaman login Instagram…")
try:
# Wait for username OR email field
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='username'], input[name='email']"))
)
except TimeoutException:
print("[Medos] Halaman login tidak termuat.")
try:
with open("/app/static/output/ig_login_error.html", "w", encoding="utf-8") as f:
f.write(driver.page_source)
driver.save_screenshot("/app/static/output/ig_login_error.png")
print("[Medos] Log error HTML dan screenshot disimpan ke /app/static/output/")
except Exception as e:
print(f"[Medos] Gagal menyimpan log error: {e}")
return False
try:
# Try both username/email and password/pass
user_field = None
for sel in ["input[name='username']", "input[name='email']"]:
try:
user_field = driver.find_element(By.CSS_SELECTOR, sel)
break
except NoSuchElementException:
pass
pass_field = None
for sel in ["input[name='password']", "input[name='pass']"]:
try:
pass_field = driver.find_element(By.CSS_SELECTOR, sel)
break
except NoSuchElementException:
pass
if not user_field or not pass_field:
print("[Medos] Field login (username/password) tidak ditemukan.")
return False
user_field.clear()
user_field.send_keys(username)
time.sleep(0.8)
pass_field.clear()
pass_field.send_keys(password)
time.sleep(0.5)
# Submit form: Press ENTER inside password field
pass_field.send_keys("\n")
time.sleep(1)
# Fallback: Try clicking the submit button if it exists
try:
submit_btn = driver.find_element(By.CSS_SELECTOR, "button[type='submit'], input[type='submit'], div[role='button']")
driver.execute_script("arguments[0].click();", submit_btn)
except Exception:
pass
# Wait for redirect away from login page
WebDriverWait(driver, 20).until(
lambda d: "/accounts/login/" not in d.current_url and "login" not in d.current_url.lower()
)
print("[Medos] Login sukses.")
except TimeoutException:
print("[Medos] Login timeout β€” cek credentials atau akun ter-throttle.")
return False
except Exception as e:
print(f"[Medos] Login gagal: {e}")
return False
# 3. Dismiss save-info / notification popups
for _ in range(2):
try:
WebDriverWait(driver, 6).until(
EC.element_to_be_clickable((
By.XPATH,
"//button[contains(text(),'Not Now') or "
"contains(text(),'Bukan Sekarang') or "
"contains(text(),'Not now')]"
))
).click()
time.sleep(1.5)
except Exception:
pass
_save_cookies(driver, cookies_file)
return True
# ── Scraping helpers ───────────────────────────────────────────────────────────
def _collect_post_links(driver, target_url: str, max_scrolls: int = 5) -> list:
print(f"[Medos] Membuka: {target_url}")
driver.get(target_url)
time.sleep(6)
links: set = set()
stall = 0
for i in range(max_scrolls):
prev_count = len(links)
for el in driver.find_elements(By.CSS_SELECTOR, "a[href*='/p/'], a[href*='/reel/']"):
href = el.get_attribute("href")
if href:
links.add(href.split("?")[0])
print(f"[Medos] Scroll {i+1}: {len(links)} link ditemukan.")
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(3.5)
if len(links) == prev_count:
stall += 1
if stall >= 3:
break
else:
stall = 0
return list(links)
def _scrape_post(driver, link: str) -> list:
"""Return list of text strings (caption + comments) from one post."""
driver.get(link)
time.sleep(4)
texts = []
# Caption β€” based on medos_scraping.py
caption_selectors = [
(By.XPATH, "//div[@data-testid='post-caption']"),
(By.XPATH, "//h1"),
(By.XPATH, "//span[contains(@class, 'x126k92a')]"),
(By.CSS_SELECTOR, "article span[dir='auto']"),
]
for by, sel in caption_selectors:
try:
el = WebDriverWait(driver, 3).until(EC.presence_of_element_located((by, sel)))
# Try to get text, if empty, we might need innerHTML but text is cleaner
t = el.text.strip()
if not t:
# If text is empty due to formatting, try extracting via JS
t = driver.execute_script("return arguments[0].innerText;", el)
if t and len(t) > 3:
texts.append(t.strip())
break
except Exception:
continue
# Load more comments (Tahap 1 Ekspansi dari medos_scraping.py)
for _ in range(5):
try:
# First try the default svg
btn = driver.find_element(
By.CSS_SELECTOR,
"svg[aria-label='Load more comments'], svg[aria-label='Muat komentar lainnya']"
)
driver.execute_script("arguments[0].click();", btn)
time.sleep(2)
except Exception:
try:
# Fallback to load more text
btn2 = driver.find_element(
By.XPATH,
"//div[@role='button']//span[contains(text(),'Load') or contains(text(),'Muat')]"
)
driver.execute_script("arguments[0].click();", btn2)
time.sleep(2)
except Exception:
break
# Collect visible comments (Ekstraksi dari medos_scraping.py)
try:
# Locators from working script + fallbacks
xpaths = [
"//div[contains(@class, 'x1cy8zhl')]/span", # From user's working macro
"//ul//li//span[@dir='auto']",
"//div[@role='button']//span[@dir='auto']",
"//div[contains(@class, 'x1xegmmw')]//span[@dir='auto']"
]
seen_texts = set()
for t in texts:
seen_texts.add(t)
for xpath in xpaths:
spans = driver.find_elements(By.XPATH, xpath)
for span in spans:
try:
t = span.text.strip()
if t and len(t) > 3 and t not in seen_texts:
seen_texts.add(t)
texts.append(t)
except Exception:
pass
except Exception as e:
print(f"[Medos] Gagal ambil komentar: {e}")
return texts
# ── Public API ─────────────────────────────────────────────────────────────────
def scrape_medos(username: str, password: str, target_account: str, mode: str = "all") -> list:
"""
Scrape Instagram profile/hashtag posts and return list of text strings.
mode: 'all' | 'date' (last 7 months)
"""
if not username or not password or not target_account:
print("[Medos] Parameter tidak lengkap.")
return []
cookies_file = f"/app/ig_cookies_{username}.json"
driver = _create_driver(mobile=False)
texts_out: list = []
try:
if not _login(driver, username, password, cookies_file):
print("[Medos] Login gagal, scraping dibatalkan.")
return []
# Determine target URL
account = target_account.strip()
if account.startswith("#"):
tag = account.lstrip("#")
target_url = f"{IG_BASE}explore/tags/{tag}/"
else:
target_url = f"{IG_BASE}{account.lstrip('@')}/"
post_links = _collect_post_links(driver, target_url, max_scrolls=5)
print(f"[Medos] {len(post_links)} link postingan ditemukan untuk '{account}'.")
for link in post_links[:30]: # cap 30 posts
try:
result = _scrape_post(driver, link)
texts_out.extend(result)
print(f"[Medos] {link} β†’ {len(result)} teks")
except Exception as e:
print(f"[Medos] Error pada {link}: {e}")
except Exception as e:
print(f"[Medos] Fatal error: {e}")
finally:
try:
driver.quit()
except Exception:
pass
print(f"[Medos] Total teks dari Instagram: {len(texts_out)}")
return texts_out