Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import sys | |
| import time | |
| import json | |
| import base64 | |
| import pickle | |
| import argparse | |
| import traceback | |
| import shutil | |
| from typing import List, Dict, Any, Tuple | |
| from datetime import datetime | |
| import tempfile | |
| try: | |
| sys.stdout.reconfigure(encoding="utf-8", errors="replace") | |
| sys.stderr.reconfigure(encoding="utf-8", errors="replace") | |
| except Exception: | |
| pass | |
| from selenium import webdriver | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.chrome.service import Service as ChromeService | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.common.exceptions import StaleElementReferenceException, NoSuchElementException, TimeoutException | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| import google.generativeai as genai | |
| from google.api_core.exceptions import ResourceExhausted | |
| WRITABLE_DIR = "/tmp" | |
| SERVICE_ACCOUNT_FILE = os.path.join(WRITABLE_DIR, "service_account.json") | |
| def get_args(): | |
| p = argparse.ArgumentParser(description="Scrape one FB group, analyze, and email alerts.") | |
| p.add_argument("--group", required=True) | |
| p.add_argument("--out", required=True) | |
| p.add_argument("--analysis-out", required=True) | |
| p.add_argument("--recipients", default="") | |
| p.add_argument("--sender", default=os.environ.get("SENDER_EMAIL", "")) | |
| p.add_argument("--cookies-file", default=os.path.join(WRITABLE_DIR, "facebook_cookies.pkl")) | |
| p.add_argument("--max-scrolls", type=int, default=int(os.environ.get("MAX_SCROLLS", "5"))) | |
| p.add_argument("--scroll-pause", type=float, default=float(os.environ.get("SCROLL_PAUSE", "3"))) | |
| p.add_argument("--gemini-keys", default="") | |
| p.add_argument("--headless", action="store_true") | |
| return p.parse_args() | |
| def new_driver(headless: bool) -> Tuple[webdriver.Chrome, str]: | |
| options = webdriver.ChromeOptions() | |
| user_data_dir = tempfile.mkdtemp(prefix="chrome_user_data_", dir=WRITABLE_DIR) | |
| options.add_argument(f"--user-data-dir={user_data_dir}") | |
| if headless: | |
| options.add_argument("--headless=new") | |
| options.add_argument("--no-sandbox") | |
| options.add_argument("--disable-dev-shm-usage") | |
| options.add_argument("--disable-gpu") | |
| options.add_argument("--disable-notifications") | |
| options.add_argument("--window-size=1920,1080") | |
| options.add_argument("--disable-extensions") | |
| options.add_argument("--remote-debugging-port=9222") | |
| options.add_argument("user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") | |
| os.environ.setdefault("HOME", WRITABLE_DIR) | |
| os.environ.setdefault("WDM_LOCAL", "1") | |
| os.environ.setdefault("WDM_CACHE_DIR", os.path.join(WRITABLE_DIR, ".wdm")) | |
| os.environ.setdefault("SE_MANAGER_DRIVER_CACHE", os.path.join(WRITABLE_DIR, "selenium")) | |
| os.makedirs(os.environ["WDM_CACHE_DIR"], exist_ok=True) | |
| os.makedirs(os.environ["SE_MANAGER_DRIVER_CACHE"], exist_ok=True) | |
| service = ChromeService() | |
| driver = webdriver.Chrome(service=service, options=options) | |
| print("[SELENIUM] WebDriver session created successfully.") | |
| return driver, user_data_dir | |
| def build_gmail_service(): | |
| if os.path.exists(SERVICE_ACCOUNT_FILE): | |
| try: | |
| sender_email = os.environ.get("SENDER_EMAIL") | |
| if not sender_email: | |
| return None | |
| credentials = service_account.Credentials.from_service_account_file( | |
| SERVICE_ACCOUNT_FILE, | |
| scopes=["https://www.googleapis.com/auth/gmail.send"] | |
| ).with_subject(sender_email) | |
| return build("gmail", "v1", credentials=credentials) | |
| except Exception as e: | |
| print(f"[GMAIL] Auth failed in final5.py: {e}") | |
| return None | |
| GEMINI_MODEL = "gemini-1.5-flash" | |
| class GeminiManager: | |
| def __init__(self, api_keys: List[str]): | |
| self.api_keys = api_keys | |
| self.current_key_index = 0 | |
| self.model = None | |
| self._setup_model() | |
| def _setup_model(self): | |
| if not self.api_keys: | |
| print("[GEMINI] No API keys provided") | |
| self.model = None | |
| return | |
| while self.current_key_index < len(self.api_keys): | |
| try: | |
| api_key = self.api_keys[self.current_key_index] | |
| genai.configure(api_key=api_key) | |
| self.model = genai.GenerativeModel(GEMINI_MODEL) | |
| print(f"[GEMINI] Using API key {self.current_key_index + 1}") | |
| return | |
| except Exception as e: | |
| print(f"[GEMINI] Failed to setup with key {self.current_key_index + 1}: {e}") | |
| self.current_key_index += 1 | |
| print("[GEMINI] All API keys failed") | |
| self.model = None | |
| def rotate_key(self): | |
| self.current_key_index += 1 | |
| self._setup_model() | |
| def is_available(self): | |
| return self.model is not None | |
| def generate_content(self, prompt: str): | |
| if not self.is_available(): | |
| raise Exception("No available Gemini model") | |
| try: | |
| return self.model.generate_content(prompt) | |
| except ResourceExhausted as e: | |
| self.rotate_key() | |
| if self.is_available(): | |
| return self.model.generate_content(prompt) | |
| else: | |
| raise e | |
| def ai_medical_intent(gemini_manager: GeminiManager, post_text: str, found_keywords: List[str]) -> Dict[str, Any]: | |
| fallback = { | |
| "is_medical_seeking": False, | |
| "confidence": "low", | |
| "medical_summary": "AI unavailable", | |
| "suggested_services": [], | |
| "urgency_level": "low", | |
| "analysis": "Fallback", | |
| "reasoning": "AI error", | |
| "matched_keywords": found_keywords | |
| } | |
| if not gemini_manager or not gemini_manager.is_available(): | |
| return fallback | |
| keywords_str = ", ".join(found_keywords) if found_keywords else "none" | |
| prompt = f"""Analyze this social post to determine if the author is seeking medical help for a personal health need. | |
| KEYWORDS: {keywords_str} | |
| RULES: | |
| 1. Flag ONLY posts where someone seeks medical care for themselves or a loved one. | |
| 2. IGNORE posts about business, donations, selling products, jobs, or general info. | |
| 3. Flag ONLY if it is a PERSONAL HEALTH NEED. | |
| Post: "{post_text}" | |
| Return ONLY JSON: | |
| {{ | |
| "is_medical_seeking": true/false, | |
| "confidence": "high/medium/low", | |
| "medical_summary": "short summary", | |
| "suggested_services": ["service1","service2"], | |
| "urgency_level": "high/medium/low", | |
| "analysis": "why it's seeking help", | |
| "reasoning": "short explanation", | |
| "matched_keywords": ["keyword1"] | |
| }}""" | |
| for _ in range(2): | |
| try: | |
| resp = gemini_manager.generate_content(prompt) | |
| txt = (getattr(resp, "text", "") or "").strip() | |
| s, e = txt.find("{"), txt.rfind("}") + 1 | |
| if s >= 0 and e > s: | |
| result = json.loads(txt[s:e]) | |
| result["is_medical_seeking"] = bool(result.get("is_medical_seeking", False)) | |
| if "matched_keywords" not in result: | |
| result["matched_keywords"] = found_keywords | |
| return result | |
| return fallback | |
| except Exception as e: | |
| print(f"[GEMINI] Error: {e}") | |
| gemini_manager.rotate_key() | |
| return fallback | |
| MEDICAL_KEYWORDS = [ | |
| "doctor","physician","primary care","healthcare","medical","clinic","hospital","urgent care","emergency","er", | |
| "specialist","pediatrician","dentist","gynecologist","obgyn","women's health","health center","family doctor", | |
| "maternity","prenatal","postnatal","labor","delivery","need doctor","looking for doctor","find doctor", | |
| "recommend doctor","medical help","health help","appointment","checkup","treatment","prescription","medicine", | |
| "surgery","best hospital","best clinic","where to go","doctor recommendation","pregnancy","birth control", | |
| "contraception","fertility","hillside","medical group","wellness center" | |
| ] | |
| def contains_keywords(text: str) -> Tuple[bool, List[str]]: | |
| tl = (text or "").lower() | |
| hits = [kw for kw in MEDICAL_KEYWORDS if kw in tl] | |
| return (len(hits) > 0, hits) | |
| def load_cookies(driver, cookies_file: str): | |
| print("[FB] Navigating to Facebook homepage to load cookies...") | |
| driver.get("https://www.facebook.com") | |
| time.sleep(2) | |
| if not os.path.exists(cookies_file): | |
| raise RuntimeError(f"[FB] FATAL: Cookies file not found at {cookies_file}") | |
| with open(cookies_file, "rb") as f: | |
| cookies = pickle.load(f) | |
| for cookie in cookies: | |
| if "sameSite" in cookie and cookie["sameSite"] not in ["Strict", "Lax", "None"]: | |
| cookie["sameSite"] = "Lax" | |
| try: | |
| driver.add_cookie(cookie) | |
| except Exception: | |
| pass | |
| print("[FB] All cookies loaded. Refreshing page to apply session...") | |
| driver.refresh() | |
| time.sleep(5) | |
| if "log in" in driver.title.lower(): | |
| print(f"[FB] WARNING: Login may have failed. Page title is: '{driver.title}'") | |
| else: | |
| print(f"[FB] Login appears successful. Page title is: '{driver.title}'") | |
| def wait_group_feed(driver, wait): | |
| wait.until(EC.presence_of_element_located((By.TAG_NAME, "body"))) | |
| try: | |
| wait.until(EC.presence_of_element_located((By.XPATH, "//div[@role='feed' or @data-pagelet='GroupFeed']"))) | |
| print("[SCRAPE] Group feed detected.") | |
| except TimeoutException: | |
| raise TimeoutException("Timed out waiting for group feed to load.") | |
| def scrape_group(driver, wait, group_url: str, max_scrolls: int, pause: float): | |
| print(f"[SCRAPE] Navigating to group: {group_url}") | |
| driver.get(group_url) | |
| wait_group_feed(driver, wait) | |
| posts, seen = [], set() | |
| for s in range(max_scrolls): | |
| print(f"[SCRAPE] --- Scroll {s+1}/{max_scrolls} ---") | |
| driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") | |
| time.sleep(pause) | |
| divs = driver.find_elements(By.XPATH, "//div[@role='article']") | |
| added_this_scroll = 0 | |
| for d in divs: | |
| try: | |
| txt = (d.text or "").strip() | |
| if len(txt) < 25 or txt in seen: | |
| continue | |
| if any(ui in txt for ui in ["Comment Share", "Write a comment...", "View more comments"]): | |
| continue | |
| seen.add(txt) | |
| posts.append({"id": len(posts) + 1, "text": txt, "group_link": group_url}) | |
| added_this_scroll += 1 | |
| except StaleElementReferenceException: | |
| continue | |
| print(f"[SCRAPE] Found {added_this_scroll} new, unique posts this scroll.") | |
| print(f"[SCRAPE] Finished scraping. Total unique posts found: {len(posts)}") | |
| return posts | |
| def try_scrape_with_fallback(group_url: str, cookies_file: str, max_scrolls: int, pause: float): | |
| driver = None | |
| user_data_dir = None | |
| posts = [] | |
| try: | |
| driver, user_data_dir = new_driver(headless=True) | |
| wait = WebDriverWait(driver, 30) | |
| load_cookies(driver, cookies_file) | |
| posts = scrape_group(driver, wait, group_url, max_scrolls, pause) | |
| except Exception as e: | |
| print(f"[SCRAPE] FATAL ERROR during scraping: {e}") | |
| raise | |
| finally: | |
| if driver: | |
| try: | |
| driver.quit() | |
| except Exception: | |
| pass | |
| if user_data_dir and os.path.exists(user_data_dir): | |
| try: | |
| shutil.rmtree(user_data_dir, ignore_errors=True) | |
| print(f"[SELENIUM] Cleaned up user data directory: {user_data_dir}") | |
| except Exception as e: | |
| print(f"[SELENIUM] Error cleaning up directory {user_data_dir}: {e}") | |
| return posts | |
| def main(): | |
| args = get_args() | |
| os.makedirs(os.path.dirname(args.out) or ".", exist_ok=True) | |
| os.makedirs(os.path.dirname(args.analysis_out) or ".", exist_ok=True) | |
| gemini_keys = [k.strip() for k in args.gemini_keys.split(",") if k.strip()] if args.gemini_keys else [] | |
| gemini_manager = GeminiManager(gemini_keys) | |
| posts = try_scrape_with_fallback(args.group.strip(), args.cookies_file, args.max_scrolls, args.scroll_pause) | |
| with open(args.out, "w", encoding="utf-8") as f: | |
| json.dump(posts, f, ensure_ascii=False, indent=2) | |
| print(f"[SCRAPE] Saved {len(posts)} scraped posts to {args.out}") | |
| print(f"::SCRAPE_SAVED::{args.out}") | |
| keyword_hits, confirmed = [], [] | |
| for p in posts: | |
| has, hits = contains_keywords(p.get("text", "")) | |
| if has: | |
| p["found_keywords"] = hits | |
| keyword_hits.append(p) | |
| print(f"::KW_HIT::{json.dumps({'id': p['id'], 'found_keywords': hits}, ensure_ascii=False)}") | |
| per_call_sleep = 5 | |
| for idx, p in enumerate(keyword_hits, start=1): | |
| found_kws = p.get("found_keywords", []) | |
| ai = ai_medical_intent(gemini_manager, p.get("text", ""), found_kws) | |
| p["ai_analysis"] = ai | |
| print(f"::AI_RESULT::{json.dumps({'id': p['id'], 'ai': ai}, ensure_ascii=False)}") | |
| if ai.get("is_medical_seeking"): | |
| confirmed.append(p) | |
| if idx < len(keyword_hits): | |
| time.sleep(per_call_sleep) | |
| report = { | |
| "analysis_date": datetime.now().isoformat(), | |
| "group_link": args.group, | |
| "total_posts": len(posts), | |
| "keyword_hits": len(keyword_hits), | |
| "confirmed_medical": len(confirmed), | |
| "emails_sent": 0, | |
| "posts": confirmed | |
| } | |
| with open(args.analysis_out, "w", encoding="utf-8") as f: | |
| json.dump(report, f, ensure_ascii=False, indent=2) | |
| print(f"[ANALYSIS] Saved analysis to {args.analysis_out}") | |
| print(f"::ANALYSIS_SAVED::{args.analysis_out}") | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except Exception: | |
| print("Main execution failed. Exiting with error.") | |
| sys.exit(1) | |