| """ |
| medos.py β Instagram scraper using Selenium. |
| Exports: scrape_medos(username, password, target_account, mode) -> list[str] |
| |
| Strategy: |
| 1. Try saved cookies first (faster, avoids login throttling). |
| 2. Fall back to username/password login via mobile IG version. |
| 3. Collect post links from profile / hashtag page. |
| 4. Scrape caption + visible comments from each post. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import os |
| import time |
| from datetime import datetime, timedelta |
|
|
| from selenium.webdriver.common.by import By |
| from selenium.webdriver.support.ui import WebDriverWait |
| from selenium.webdriver.support import expected_conditions as EC |
| from selenium.common.exceptions import TimeoutException, NoSuchElementException |
|
|
| from ._driver import _create_driver |
|
|
| IG_BASE = "https://www.instagram.com/" |
|
|
|
|
| |
|
|
| def _save_cookies(driver, path: str) -> None: |
| try: |
| with open(path, "w", encoding="utf-8") as f: |
| json.dump(driver.get_cookies(), f, ensure_ascii=False, indent=2) |
| except Exception as e: |
| print(f"[Medos] Gagal simpan cookies: {e}") |
|
|
|
|
| def _load_cookies(driver, path: str) -> bool: |
| if not os.path.exists(path) or os.path.getsize(path) == 0: |
| return False |
| try: |
| with open(path, "r", encoding="utf-8") as f: |
| cookies = json.load(f) |
| driver.get(IG_BASE) |
| time.sleep(2) |
| driver.delete_all_cookies() |
| for c in cookies: |
| allowed = {k: c[k] for k in c.keys() & {"name", "value", "domain", "path", "secure", "httpOnly", "expiry"}} |
| if "expiry" in allowed and isinstance(allowed["expiry"], float): |
| allowed["expiry"] = int(allowed["expiry"]) |
| try: |
| driver.add_cookie(allowed) |
| except Exception: |
| allowed.pop("domain", None) |
| try: |
| driver.add_cookie(allowed) |
| except Exception: |
| pass |
| return True |
| except Exception as e: |
| print(f"[Medos] Gagal load cookies: {e}") |
| return False |
|
|
|
|
| def _is_logged_in(driver) -> bool: |
| """Check if the session has a valid sessionid cookie on instagram.""" |
| return any(c.get("name") == "sessionid" for c in driver.get_cookies()) |
|
|
|
|
| |
|
|
| def _login(driver, username: str, password: str, cookies_file: str) -> bool: |
| |
| if _load_cookies(driver, cookies_file): |
| driver.get(IG_BASE) |
| time.sleep(3) |
| if _is_logged_in(driver): |
| print("[Medos] Login via cookies OK.") |
| return True |
| print("[Medos] Cookies kadaluarsa, coba login manual.") |
|
|
| |
| login_url = f"{IG_BASE}accounts/login/" |
| driver.get(login_url) |
| print("[Medos] Membuka halaman login Instagramβ¦") |
|
|
| try: |
| |
| WebDriverWait(driver, 20).until( |
| EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='username'], input[name='email']")) |
| ) |
| except TimeoutException: |
| print("[Medos] Halaman login tidak termuat.") |
| try: |
| with open("/app/static/output/ig_login_error.html", "w", encoding="utf-8") as f: |
| f.write(driver.page_source) |
| driver.save_screenshot("/app/static/output/ig_login_error.png") |
| print("[Medos] Log error HTML dan screenshot disimpan ke /app/static/output/") |
| except Exception as e: |
| print(f"[Medos] Gagal menyimpan log error: {e}") |
| return False |
|
|
| try: |
| |
| user_field = None |
| for sel in ["input[name='username']", "input[name='email']"]: |
| try: |
| user_field = driver.find_element(By.CSS_SELECTOR, sel) |
| break |
| except NoSuchElementException: |
| pass |
| |
| pass_field = None |
| for sel in ["input[name='password']", "input[name='pass']"]: |
| try: |
| pass_field = driver.find_element(By.CSS_SELECTOR, sel) |
| break |
| except NoSuchElementException: |
| pass |
|
|
| if not user_field or not pass_field: |
| print("[Medos] Field login (username/password) tidak ditemukan.") |
| return False |
|
|
| user_field.clear() |
| user_field.send_keys(username) |
| time.sleep(0.8) |
| pass_field.clear() |
| pass_field.send_keys(password) |
| time.sleep(0.5) |
|
|
| |
| pass_field.send_keys("\n") |
| time.sleep(1) |
|
|
| |
| try: |
| submit_btn = driver.find_element(By.CSS_SELECTOR, "button[type='submit'], input[type='submit'], div[role='button']") |
| driver.execute_script("arguments[0].click();", submit_btn) |
| except Exception: |
| pass |
|
|
| |
| WebDriverWait(driver, 20).until( |
| lambda d: "/accounts/login/" not in d.current_url and "login" not in d.current_url.lower() |
| ) |
| print("[Medos] Login sukses.") |
| except TimeoutException: |
| print("[Medos] Login timeout β cek credentials atau akun ter-throttle.") |
| return False |
| except Exception as e: |
| print(f"[Medos] Login gagal: {e}") |
| return False |
|
|
| |
| for _ in range(2): |
| try: |
| WebDriverWait(driver, 6).until( |
| EC.element_to_be_clickable(( |
| By.XPATH, |
| "//button[contains(text(),'Not Now') or " |
| "contains(text(),'Bukan Sekarang') or " |
| "contains(text(),'Not now')]" |
| )) |
| ).click() |
| time.sleep(1.5) |
| except Exception: |
| pass |
|
|
| _save_cookies(driver, cookies_file) |
| return True |
|
|
|
|
| |
|
|
| def _collect_post_links(driver, target_url: str, max_scrolls: int = 5) -> list: |
| print(f"[Medos] Membuka: {target_url}") |
| driver.get(target_url) |
| time.sleep(6) |
|
|
| links: set = set() |
| stall = 0 |
|
|
| for i in range(max_scrolls): |
| prev_count = len(links) |
| for el in driver.find_elements(By.CSS_SELECTOR, "a[href*='/p/'], a[href*='/reel/']"): |
| href = el.get_attribute("href") |
| if href: |
| links.add(href.split("?")[0]) |
| print(f"[Medos] Scroll {i+1}: {len(links)} link ditemukan.") |
| driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") |
| time.sleep(3.5) |
| if len(links) == prev_count: |
| stall += 1 |
| if stall >= 3: |
| break |
| else: |
| stall = 0 |
|
|
| return list(links) |
|
|
|
|
| def _scrape_post(driver, link: str) -> list: |
| """Return list of text strings (caption + comments) from one post.""" |
| driver.get(link) |
| time.sleep(4) |
|
|
| texts = [] |
|
|
| |
| caption_selectors = [ |
| (By.XPATH, "//div[@data-testid='post-caption']"), |
| (By.XPATH, "//h1"), |
| (By.XPATH, "//span[contains(@class, 'x126k92a')]"), |
| (By.CSS_SELECTOR, "article span[dir='auto']"), |
| ] |
| for by, sel in caption_selectors: |
| try: |
| el = WebDriverWait(driver, 3).until(EC.presence_of_element_located((by, sel))) |
| |
| t = el.text.strip() |
| if not t: |
| |
| t = driver.execute_script("return arguments[0].innerText;", el) |
| |
| if t and len(t) > 3: |
| texts.append(t.strip()) |
| break |
| except Exception: |
| continue |
|
|
| |
| for _ in range(5): |
| try: |
| |
| btn = driver.find_element( |
| By.CSS_SELECTOR, |
| "svg[aria-label='Load more comments'], svg[aria-label='Muat komentar lainnya']" |
| ) |
| driver.execute_script("arguments[0].click();", btn) |
| time.sleep(2) |
| except Exception: |
| try: |
| |
| btn2 = driver.find_element( |
| By.XPATH, |
| "//div[@role='button']//span[contains(text(),'Load') or contains(text(),'Muat')]" |
| ) |
| driver.execute_script("arguments[0].click();", btn2) |
| time.sleep(2) |
| except Exception: |
| break |
|
|
| |
| try: |
| |
| xpaths = [ |
| "//div[contains(@class, 'x1cy8zhl')]/span", |
| "//ul//li//span[@dir='auto']", |
| "//div[@role='button']//span[@dir='auto']", |
| "//div[contains(@class, 'x1xegmmw')]//span[@dir='auto']" |
| ] |
| seen_texts = set() |
| for t in texts: |
| seen_texts.add(t) |
|
|
| for xpath in xpaths: |
| spans = driver.find_elements(By.XPATH, xpath) |
| for span in spans: |
| try: |
| t = span.text.strip() |
| if t and len(t) > 3 and t not in seen_texts: |
| seen_texts.add(t) |
| texts.append(t) |
| except Exception: |
| pass |
| except Exception as e: |
| print(f"[Medos] Gagal ambil komentar: {e}") |
|
|
| return texts |
|
|
|
|
| |
|
|
| def scrape_medos(username: str, password: str, target_account: str, mode: str = "all") -> list: |
| """ |
| Scrape Instagram profile/hashtag posts and return list of text strings. |
| mode: 'all' | 'date' (last 7 months) |
| """ |
| if not username or not password or not target_account: |
| print("[Medos] Parameter tidak lengkap.") |
| return [] |
|
|
| cookies_file = f"/app/ig_cookies_{username}.json" |
| driver = _create_driver(mobile=False) |
| texts_out: list = [] |
|
|
| try: |
| if not _login(driver, username, password, cookies_file): |
| print("[Medos] Login gagal, scraping dibatalkan.") |
| return [] |
|
|
| |
| account = target_account.strip() |
| if account.startswith("#"): |
| tag = account.lstrip("#") |
| target_url = f"{IG_BASE}explore/tags/{tag}/" |
| else: |
| target_url = f"{IG_BASE}{account.lstrip('@')}/" |
|
|
| post_links = _collect_post_links(driver, target_url, max_scrolls=5) |
| print(f"[Medos] {len(post_links)} link postingan ditemukan untuk '{account}'.") |
|
|
| for link in post_links[:30]: |
| try: |
| result = _scrape_post(driver, link) |
| texts_out.extend(result) |
| print(f"[Medos] {link} β {len(result)} teks") |
| except Exception as e: |
| print(f"[Medos] Error pada {link}: {e}") |
|
|
| except Exception as e: |
| print(f"[Medos] Fatal error: {e}") |
| finally: |
| try: |
| driver.quit() |
| except Exception: |
| pass |
|
|
| print(f"[Medos] Total teks dari Instagram: {len(texts_out)}") |
| return texts_out |