fb_scraper / final5.py
sonuprasad23's picture
Project Uploaded
8a3c8d8
import os
import re
import sys
import time
import json
import base64
import pickle
import argparse
import traceback
import shutil
from typing import List, Dict, Any, Tuple
from datetime import datetime
import tempfile
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import StaleElementReferenceException, NoSuchElementException, TimeoutException
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import google.generativeai as genai
from google.api_core.exceptions import ResourceExhausted
WRITABLE_DIR = "/tmp"
SERVICE_ACCOUNT_FILE = os.path.join(WRITABLE_DIR, "service_account.json")
def get_args():
p = argparse.ArgumentParser(description="Scrape one FB group, analyze, and email alerts.")
p.add_argument("--group", required=True)
p.add_argument("--out", required=True)
p.add_argument("--analysis-out", required=True)
p.add_argument("--recipients", default="")
p.add_argument("--sender", default=os.environ.get("SENDER_EMAIL", ""))
p.add_argument("--cookies-file", default=os.path.join(WRITABLE_DIR, "facebook_cookies.pkl"))
p.add_argument("--max-scrolls", type=int, default=int(os.environ.get("MAX_SCROLLS", "5")))
p.add_argument("--scroll-pause", type=float, default=float(os.environ.get("SCROLL_PAUSE", "3")))
p.add_argument("--gemini-keys", default="")
p.add_argument("--headless", action="store_true")
return p.parse_args()
def new_driver(headless: bool) -> Tuple[webdriver.Chrome, str]:
options = webdriver.ChromeOptions()
user_data_dir = tempfile.mkdtemp(prefix="chrome_user_data_", dir=WRITABLE_DIR)
options.add_argument(f"--user-data-dir={user_data_dir}")
if headless:
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--disable-notifications")
options.add_argument("--window-size=1920,1080")
options.add_argument("--disable-extensions")
options.add_argument("--remote-debugging-port=9222")
options.add_argument("user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
os.environ.setdefault("HOME", WRITABLE_DIR)
os.environ.setdefault("WDM_LOCAL", "1")
os.environ.setdefault("WDM_CACHE_DIR", os.path.join(WRITABLE_DIR, ".wdm"))
os.environ.setdefault("SE_MANAGER_DRIVER_CACHE", os.path.join(WRITABLE_DIR, "selenium"))
os.makedirs(os.environ["WDM_CACHE_DIR"], exist_ok=True)
os.makedirs(os.environ["SE_MANAGER_DRIVER_CACHE"], exist_ok=True)
service = ChromeService()
driver = webdriver.Chrome(service=service, options=options)
print("[SELENIUM] WebDriver session created successfully.")
return driver, user_data_dir
def build_gmail_service():
if os.path.exists(SERVICE_ACCOUNT_FILE):
try:
sender_email = os.environ.get("SENDER_EMAIL")
if not sender_email:
return None
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=["https://www.googleapis.com/auth/gmail.send"]
).with_subject(sender_email)
return build("gmail", "v1", credentials=credentials)
except Exception as e:
print(f"[GMAIL] Auth failed in final5.py: {e}")
return None
GEMINI_MODEL = "gemini-1.5-flash"
class GeminiManager:
def __init__(self, api_keys: List[str]):
self.api_keys = api_keys
self.current_key_index = 0
self.model = None
self._setup_model()
def _setup_model(self):
if not self.api_keys:
print("[GEMINI] No API keys provided")
self.model = None
return
while self.current_key_index < len(self.api_keys):
try:
api_key = self.api_keys[self.current_key_index]
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel(GEMINI_MODEL)
print(f"[GEMINI] Using API key {self.current_key_index + 1}")
return
except Exception as e:
print(f"[GEMINI] Failed to setup with key {self.current_key_index + 1}: {e}")
self.current_key_index += 1
print("[GEMINI] All API keys failed")
self.model = None
def rotate_key(self):
self.current_key_index += 1
self._setup_model()
def is_available(self):
return self.model is not None
def generate_content(self, prompt: str):
if not self.is_available():
raise Exception("No available Gemini model")
try:
return self.model.generate_content(prompt)
except ResourceExhausted as e:
self.rotate_key()
if self.is_available():
return self.model.generate_content(prompt)
else:
raise e
def ai_medical_intent(gemini_manager: GeminiManager, post_text: str, found_keywords: List[str]) -> Dict[str, Any]:
fallback = {
"is_medical_seeking": False,
"confidence": "low",
"medical_summary": "AI unavailable",
"suggested_services": [],
"urgency_level": "low",
"analysis": "Fallback",
"reasoning": "AI error",
"matched_keywords": found_keywords
}
if not gemini_manager or not gemini_manager.is_available():
return fallback
keywords_str = ", ".join(found_keywords) if found_keywords else "none"
prompt = f"""Analyze this social post to determine if the author is seeking medical help for a personal health need.
KEYWORDS: {keywords_str}
RULES:
1. Flag ONLY posts where someone seeks medical care for themselves or a loved one.
2. IGNORE posts about business, donations, selling products, jobs, or general info.
3. Flag ONLY if it is a PERSONAL HEALTH NEED.
Post: "{post_text}"
Return ONLY JSON:
{{
"is_medical_seeking": true/false,
"confidence": "high/medium/low",
"medical_summary": "short summary",
"suggested_services": ["service1","service2"],
"urgency_level": "high/medium/low",
"analysis": "why it's seeking help",
"reasoning": "short explanation",
"matched_keywords": ["keyword1"]
}}"""
for _ in range(2):
try:
resp = gemini_manager.generate_content(prompt)
txt = (getattr(resp, "text", "") or "").strip()
s, e = txt.find("{"), txt.rfind("}") + 1
if s >= 0 and e > s:
result = json.loads(txt[s:e])
result["is_medical_seeking"] = bool(result.get("is_medical_seeking", False))
if "matched_keywords" not in result:
result["matched_keywords"] = found_keywords
return result
return fallback
except Exception as e:
print(f"[GEMINI] Error: {e}")
gemini_manager.rotate_key()
return fallback
MEDICAL_KEYWORDS = [
"doctor","physician","primary care","healthcare","medical","clinic","hospital","urgent care","emergency","er",
"specialist","pediatrician","dentist","gynecologist","obgyn","women's health","health center","family doctor",
"maternity","prenatal","postnatal","labor","delivery","need doctor","looking for doctor","find doctor",
"recommend doctor","medical help","health help","appointment","checkup","treatment","prescription","medicine",
"surgery","best hospital","best clinic","where to go","doctor recommendation","pregnancy","birth control",
"contraception","fertility","hillside","medical group","wellness center"
]
def contains_keywords(text: str) -> Tuple[bool, List[str]]:
tl = (text or "").lower()
hits = [kw for kw in MEDICAL_KEYWORDS if kw in tl]
return (len(hits) > 0, hits)
def load_cookies(driver, cookies_file: str):
print("[FB] Navigating to Facebook homepage to load cookies...")
driver.get("https://www.facebook.com")
time.sleep(2)
if not os.path.exists(cookies_file):
raise RuntimeError(f"[FB] FATAL: Cookies file not found at {cookies_file}")
with open(cookies_file, "rb") as f:
cookies = pickle.load(f)
for cookie in cookies:
if "sameSite" in cookie and cookie["sameSite"] not in ["Strict", "Lax", "None"]:
cookie["sameSite"] = "Lax"
try:
driver.add_cookie(cookie)
except Exception:
pass
print("[FB] All cookies loaded. Refreshing page to apply session...")
driver.refresh()
time.sleep(5)
if "log in" in driver.title.lower():
print(f"[FB] WARNING: Login may have failed. Page title is: '{driver.title}'")
else:
print(f"[FB] Login appears successful. Page title is: '{driver.title}'")
def wait_group_feed(driver, wait):
wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
try:
wait.until(EC.presence_of_element_located((By.XPATH, "//div[@role='feed' or @data-pagelet='GroupFeed']")))
print("[SCRAPE] Group feed detected.")
except TimeoutException:
raise TimeoutException("Timed out waiting for group feed to load.")
def scrape_group(driver, wait, group_url: str, max_scrolls: int, pause: float):
print(f"[SCRAPE] Navigating to group: {group_url}")
driver.get(group_url)
wait_group_feed(driver, wait)
posts, seen = [], set()
for s in range(max_scrolls):
print(f"[SCRAPE] --- Scroll {s+1}/{max_scrolls} ---")
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(pause)
divs = driver.find_elements(By.XPATH, "//div[@role='article']")
added_this_scroll = 0
for d in divs:
try:
txt = (d.text or "").strip()
if len(txt) < 25 or txt in seen:
continue
if any(ui in txt for ui in ["Comment Share", "Write a comment...", "View more comments"]):
continue
seen.add(txt)
posts.append({"id": len(posts) + 1, "text": txt, "group_link": group_url})
added_this_scroll += 1
except StaleElementReferenceException:
continue
print(f"[SCRAPE] Found {added_this_scroll} new, unique posts this scroll.")
print(f"[SCRAPE] Finished scraping. Total unique posts found: {len(posts)}")
return posts
def try_scrape_with_fallback(group_url: str, cookies_file: str, max_scrolls: int, pause: float):
driver = None
user_data_dir = None
posts = []
try:
driver, user_data_dir = new_driver(headless=True)
wait = WebDriverWait(driver, 30)
load_cookies(driver, cookies_file)
posts = scrape_group(driver, wait, group_url, max_scrolls, pause)
except Exception as e:
print(f"[SCRAPE] FATAL ERROR during scraping: {e}")
raise
finally:
if driver:
try:
driver.quit()
except Exception:
pass
if user_data_dir and os.path.exists(user_data_dir):
try:
shutil.rmtree(user_data_dir, ignore_errors=True)
print(f"[SELENIUM] Cleaned up user data directory: {user_data_dir}")
except Exception as e:
print(f"[SELENIUM] Error cleaning up directory {user_data_dir}: {e}")
return posts
def main():
args = get_args()
os.makedirs(os.path.dirname(args.out) or ".", exist_ok=True)
os.makedirs(os.path.dirname(args.analysis_out) or ".", exist_ok=True)
gemini_keys = [k.strip() for k in args.gemini_keys.split(",") if k.strip()] if args.gemini_keys else []
gemini_manager = GeminiManager(gemini_keys)
posts = try_scrape_with_fallback(args.group.strip(), args.cookies_file, args.max_scrolls, args.scroll_pause)
with open(args.out, "w", encoding="utf-8") as f:
json.dump(posts, f, ensure_ascii=False, indent=2)
print(f"[SCRAPE] Saved {len(posts)} scraped posts to {args.out}")
print(f"::SCRAPE_SAVED::{args.out}")
keyword_hits, confirmed = [], []
for p in posts:
has, hits = contains_keywords(p.get("text", ""))
if has:
p["found_keywords"] = hits
keyword_hits.append(p)
print(f"::KW_HIT::{json.dumps({'id': p['id'], 'found_keywords': hits}, ensure_ascii=False)}")
per_call_sleep = 5
for idx, p in enumerate(keyword_hits, start=1):
found_kws = p.get("found_keywords", [])
ai = ai_medical_intent(gemini_manager, p.get("text", ""), found_kws)
p["ai_analysis"] = ai
print(f"::AI_RESULT::{json.dumps({'id': p['id'], 'ai': ai}, ensure_ascii=False)}")
if ai.get("is_medical_seeking"):
confirmed.append(p)
if idx < len(keyword_hits):
time.sleep(per_call_sleep)
report = {
"analysis_date": datetime.now().isoformat(),
"group_link": args.group,
"total_posts": len(posts),
"keyword_hits": len(keyword_hits),
"confirmed_medical": len(confirmed),
"emails_sent": 0,
"posts": confirmed
}
with open(args.analysis_out, "w", encoding="utf-8") as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"[ANALYSIS] Saved analysis to {args.analysis_out}")
print(f"::ANALYSIS_SAVED::{args.analysis_out}")
if __name__ == "__main__":
try:
main()
except Exception:
print("Main execution failed. Exiting with error.")
sys.exit(1)