import os import re import sys import time import json import base64 import pickle import argparse import traceback import shutil from typing import List, Dict, Any, Tuple from datetime import datetime import tempfile try: sys.stdout.reconfigure(encoding="utf-8", errors="replace") sys.stderr.reconfigure(encoding="utf-8", errors="replace") except Exception: pass from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import StaleElementReferenceException, NoSuchElementException, TimeoutException from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.errors import HttpError import google.generativeai as genai from google.api_core.exceptions import ResourceExhausted WRITABLE_DIR = "/tmp" SERVICE_ACCOUNT_FILE = os.path.join(WRITABLE_DIR, "service_account.json") def get_args(): p = argparse.ArgumentParser(description="Scrape one FB group, analyze, and email alerts.") p.add_argument("--group", required=True) p.add_argument("--out", required=True) p.add_argument("--analysis-out", required=True) p.add_argument("--recipients", default="") p.add_argument("--sender", default=os.environ.get("SENDER_EMAIL", "")) p.add_argument("--cookies-file", default=os.path.join(WRITABLE_DIR, "facebook_cookies.pkl")) p.add_argument("--max-scrolls", type=int, default=int(os.environ.get("MAX_SCROLLS", "5"))) p.add_argument("--scroll-pause", type=float, default=float(os.environ.get("SCROLL_PAUSE", "3"))) p.add_argument("--gemini-keys", default="") p.add_argument("--headless", action="store_true") return p.parse_args() def new_driver(headless: bool) -> Tuple[webdriver.Chrome, str]: options = webdriver.ChromeOptions() user_data_dir = tempfile.mkdtemp(prefix="chrome_user_data_", dir=WRITABLE_DIR) options.add_argument(f"--user-data-dir={user_data_dir}") if headless: options.add_argument("--headless=new") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.add_argument("--disable-notifications") options.add_argument("--window-size=1920,1080") options.add_argument("--disable-extensions") options.add_argument("--remote-debugging-port=9222") options.add_argument("user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") os.environ.setdefault("HOME", WRITABLE_DIR) os.environ.setdefault("WDM_LOCAL", "1") os.environ.setdefault("WDM_CACHE_DIR", os.path.join(WRITABLE_DIR, ".wdm")) os.environ.setdefault("SE_MANAGER_DRIVER_CACHE", os.path.join(WRITABLE_DIR, "selenium")) os.makedirs(os.environ["WDM_CACHE_DIR"], exist_ok=True) os.makedirs(os.environ["SE_MANAGER_DRIVER_CACHE"], exist_ok=True) service = ChromeService() driver = webdriver.Chrome(service=service, options=options) print("[SELENIUM] WebDriver session created successfully.") return driver, user_data_dir def build_gmail_service(): if os.path.exists(SERVICE_ACCOUNT_FILE): try: sender_email = os.environ.get("SENDER_EMAIL") if not sender_email: return None credentials = service_account.Credentials.from_service_account_file( SERVICE_ACCOUNT_FILE, scopes=["https://www.googleapis.com/auth/gmail.send"] ).with_subject(sender_email) return build("gmail", "v1", credentials=credentials) except Exception as e: print(f"[GMAIL] Auth failed in final5.py: {e}") return None GEMINI_MODEL = "gemini-1.5-flash" class GeminiManager: def __init__(self, api_keys: List[str]): self.api_keys = api_keys self.current_key_index = 0 self.model = None self._setup_model() def _setup_model(self): if not self.api_keys: print("[GEMINI] No API keys provided") self.model = None return while self.current_key_index < len(self.api_keys): try: api_key = self.api_keys[self.current_key_index] genai.configure(api_key=api_key) self.model = genai.GenerativeModel(GEMINI_MODEL) print(f"[GEMINI] Using API key {self.current_key_index + 1}") return except Exception as e: print(f"[GEMINI] Failed to setup with key {self.current_key_index + 1}: {e}") self.current_key_index += 1 print("[GEMINI] All API keys failed") self.model = None def rotate_key(self): self.current_key_index += 1 self._setup_model() def is_available(self): return self.model is not None def generate_content(self, prompt: str): if not self.is_available(): raise Exception("No available Gemini model") try: return self.model.generate_content(prompt) except ResourceExhausted as e: self.rotate_key() if self.is_available(): return self.model.generate_content(prompt) else: raise e def ai_medical_intent(gemini_manager: GeminiManager, post_text: str, found_keywords: List[str]) -> Dict[str, Any]: fallback = { "is_medical_seeking": False, "confidence": "low", "medical_summary": "AI unavailable", "suggested_services": [], "urgency_level": "low", "analysis": "Fallback", "reasoning": "AI error", "matched_keywords": found_keywords } if not gemini_manager or not gemini_manager.is_available(): return fallback keywords_str = ", ".join(found_keywords) if found_keywords else "none" prompt = f"""Analyze this social post to determine if the author is seeking medical help for a personal health need. KEYWORDS: {keywords_str} RULES: 1. Flag ONLY posts where someone seeks medical care for themselves or a loved one. 2. IGNORE posts about business, donations, selling products, jobs, or general info. 3. Flag ONLY if it is a PERSONAL HEALTH NEED. Post: "{post_text}" Return ONLY JSON: {{ "is_medical_seeking": true/false, "confidence": "high/medium/low", "medical_summary": "short summary", "suggested_services": ["service1","service2"], "urgency_level": "high/medium/low", "analysis": "why it's seeking help", "reasoning": "short explanation", "matched_keywords": ["keyword1"] }}""" for _ in range(2): try: resp = gemini_manager.generate_content(prompt) txt = (getattr(resp, "text", "") or "").strip() s, e = txt.find("{"), txt.rfind("}") + 1 if s >= 0 and e > s: result = json.loads(txt[s:e]) result["is_medical_seeking"] = bool(result.get("is_medical_seeking", False)) if "matched_keywords" not in result: result["matched_keywords"] = found_keywords return result return fallback except Exception as e: print(f"[GEMINI] Error: {e}") gemini_manager.rotate_key() return fallback MEDICAL_KEYWORDS = [ "doctor","physician","primary care","healthcare","medical","clinic","hospital","urgent care","emergency","er", "specialist","pediatrician","dentist","gynecologist","obgyn","women's health","health center","family doctor", "maternity","prenatal","postnatal","labor","delivery","need doctor","looking for doctor","find doctor", "recommend doctor","medical help","health help","appointment","checkup","treatment","prescription","medicine", "surgery","best hospital","best clinic","where to go","doctor recommendation","pregnancy","birth control", "contraception","fertility","hillside","medical group","wellness center" ] def contains_keywords(text: str) -> Tuple[bool, List[str]]: tl = (text or "").lower() hits = [kw for kw in MEDICAL_KEYWORDS if kw in tl] return (len(hits) > 0, hits) def load_cookies(driver, cookies_file: str): print("[FB] Navigating to Facebook homepage to load cookies...") driver.get("https://www.facebook.com") time.sleep(2) if not os.path.exists(cookies_file): raise RuntimeError(f"[FB] FATAL: Cookies file not found at {cookies_file}") with open(cookies_file, "rb") as f: cookies = pickle.load(f) for cookie in cookies: if "sameSite" in cookie and cookie["sameSite"] not in ["Strict", "Lax", "None"]: cookie["sameSite"] = "Lax" try: driver.add_cookie(cookie) except Exception: pass print("[FB] All cookies loaded. Refreshing page to apply session...") driver.refresh() time.sleep(5) if "log in" in driver.title.lower(): print(f"[FB] WARNING: Login may have failed. Page title is: '{driver.title}'") else: print(f"[FB] Login appears successful. Page title is: '{driver.title}'") def wait_group_feed(driver, wait): wait.until(EC.presence_of_element_located((By.TAG_NAME, "body"))) try: wait.until(EC.presence_of_element_located((By.XPATH, "//div[@role='feed' or @data-pagelet='GroupFeed']"))) print("[SCRAPE] Group feed detected.") except TimeoutException: raise TimeoutException("Timed out waiting for group feed to load.") def scrape_group(driver, wait, group_url: str, max_scrolls: int, pause: float): print(f"[SCRAPE] Navigating to group: {group_url}") driver.get(group_url) wait_group_feed(driver, wait) posts, seen = [], set() for s in range(max_scrolls): print(f"[SCRAPE] --- Scroll {s+1}/{max_scrolls} ---") driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(pause) divs = driver.find_elements(By.XPATH, "//div[@role='article']") added_this_scroll = 0 for d in divs: try: txt = (d.text or "").strip() if len(txt) < 25 or txt in seen: continue if any(ui in txt for ui in ["Comment Share", "Write a comment...", "View more comments"]): continue seen.add(txt) posts.append({"id": len(posts) + 1, "text": txt, "group_link": group_url}) added_this_scroll += 1 except StaleElementReferenceException: continue print(f"[SCRAPE] Found {added_this_scroll} new, unique posts this scroll.") print(f"[SCRAPE] Finished scraping. Total unique posts found: {len(posts)}") return posts def try_scrape_with_fallback(group_url: str, cookies_file: str, max_scrolls: int, pause: float): driver = None user_data_dir = None posts = [] try: driver, user_data_dir = new_driver(headless=True) wait = WebDriverWait(driver, 30) load_cookies(driver, cookies_file) posts = scrape_group(driver, wait, group_url, max_scrolls, pause) except Exception as e: print(f"[SCRAPE] FATAL ERROR during scraping: {e}") raise finally: if driver: try: driver.quit() except Exception: pass if user_data_dir and os.path.exists(user_data_dir): try: shutil.rmtree(user_data_dir, ignore_errors=True) print(f"[SELENIUM] Cleaned up user data directory: {user_data_dir}") except Exception as e: print(f"[SELENIUM] Error cleaning up directory {user_data_dir}: {e}") return posts def main(): args = get_args() os.makedirs(os.path.dirname(args.out) or ".", exist_ok=True) os.makedirs(os.path.dirname(args.analysis_out) or ".", exist_ok=True) gemini_keys = [k.strip() for k in args.gemini_keys.split(",") if k.strip()] if args.gemini_keys else [] gemini_manager = GeminiManager(gemini_keys) posts = try_scrape_with_fallback(args.group.strip(), args.cookies_file, args.max_scrolls, args.scroll_pause) with open(args.out, "w", encoding="utf-8") as f: json.dump(posts, f, ensure_ascii=False, indent=2) print(f"[SCRAPE] Saved {len(posts)} scraped posts to {args.out}") print(f"::SCRAPE_SAVED::{args.out}") keyword_hits, confirmed = [], [] for p in posts: has, hits = contains_keywords(p.get("text", "")) if has: p["found_keywords"] = hits keyword_hits.append(p) print(f"::KW_HIT::{json.dumps({'id': p['id'], 'found_keywords': hits}, ensure_ascii=False)}") per_call_sleep = 5 for idx, p in enumerate(keyword_hits, start=1): found_kws = p.get("found_keywords", []) ai = ai_medical_intent(gemini_manager, p.get("text", ""), found_kws) p["ai_analysis"] = ai print(f"::AI_RESULT::{json.dumps({'id': p['id'], 'ai': ai}, ensure_ascii=False)}") if ai.get("is_medical_seeking"): confirmed.append(p) if idx < len(keyword_hits): time.sleep(per_call_sleep) report = { "analysis_date": datetime.now().isoformat(), "group_link": args.group, "total_posts": len(posts), "keyword_hits": len(keyword_hits), "confirmed_medical": len(confirmed), "emails_sent": 0, "posts": confirmed } with open(args.analysis_out, "w", encoding="utf-8") as f: json.dump(report, f, ensure_ascii=False, indent=2) print(f"[ANALYSIS] Saved analysis to {args.analysis_out}") print(f"::ANALYSIS_SAVED::{args.analysis_out}") if __name__ == "__main__": try: main() except Exception: print("Main execution failed. Exiting with error.") sys.exit(1)