Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import time | |
| import base64 | |
| import pickle | |
| import subprocess | |
| import threading | |
| import traceback | |
| import html | |
| import binascii | |
| from datetime import datetime | |
| from dataclasses import dataclass, field | |
| from typing import List, Dict, Any, Optional | |
| from flask import Flask, request, jsonify, send_from_directory | |
| from flask_cors import CORS | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| class LogBuffer: | |
| def __init__(self, max_items: int = 10000): | |
| self._buf: List[Dict[str, Any]] = [] | |
| self._lock = threading.Lock() | |
| self._next_id = 1 | |
| self._max = max_items | |
| def append(self, msg: str, level: str = "info", source: str = "server"): | |
| ts = datetime.now().strftime("%H:%M:%S") | |
| line = {"id": self._next_id, "ts": ts, "level": level, "source": source, "msg": msg} | |
| with self._lock: | |
| self._buf.append(line) | |
| self._next_id += 1 | |
| if len(self._buf) > self._max: | |
| self._buf = self._buf[-self._max:] | |
| def clear(self): | |
| with self._lock: | |
| self._buf.clear() | |
| def get_after(self, after_id: int, limit: int = 500): | |
| with self._lock: | |
| if after_id <= 0: | |
| data = self._buf[-limit:] | |
| else: | |
| data = [x for x in self._buf if x["id"] > after_id][:limit] | |
| last_id = self._buf[-1]["id"] if self._buf else after_id | |
| return data, last_id | |
| logs = LogBuffer() | |
| def log(msg: str, level: str = "info", source: str = "server"): | |
| logs.append(msg, level, source) | |
| print(f"[{level.upper()}][{source}] {msg}", flush=True) | |
| def decode_base64_with_padding(b64_string: str) -> bytes: | |
| missing_padding = len(b64_string) % 4 | |
| if missing_padding: | |
| b64_string += "=" * (4 - missing_padding) | |
| try: | |
| return base64.b64decode(b64_string) | |
| except binascii.Error as e: | |
| log(f"Error decoding base64 string: {e}", "error", "SERVER") | |
| return b"" | |
| WRITABLE_DIR = "/tmp" | |
| COOKIES_PATH = os.path.join(WRITABLE_DIR, "facebook_cookies.pkl") | |
| SERVICE_ACCOUNT_FILE = os.path.join(WRITABLE_DIR, "service_account.json") | |
| if "FB_COOKIES_B64" in os.environ: | |
| decoded_cookies = decode_base64_with_padding(os.environ["FB_COOKIES_B64"]) | |
| if decoded_cookies: | |
| with open(COOKIES_PATH, "wb") as f: | |
| f.write(decoded_cookies) | |
| if "SERVICE_ACCOUNT_B64" in os.environ: | |
| decoded_service_account = decode_base64_with_padding(os.environ["SERVICE_ACCOUNT_B64"]) | |
| if decoded_service_account: | |
| with open(SERVICE_ACCOUNT_FILE, "w") as f: | |
| f.write(decoded_service_account.decode("utf-8")) | |
| GROUPS_TXT = os.environ.get("GROUPS_TXT", "groups.txt") | |
| FINAL5_PATH = os.environ.get("FINAL5_PATH", "final5.py") | |
| PYTHON_BIN = os.environ.get("PYTHON_BIN", "python") | |
| SENDER_EMAIL = os.environ.get("SENDER_EMAIL", "smahato@hillsidemedicalgroup.com") | |
| SCRAPE_OUTDIR = os.path.join(WRITABLE_DIR, "scraped") | |
| ANALYSIS_OUTDIR = os.path.join(WRITABLE_DIR, "analysis") | |
| GEMINI_KEYS = [] | |
| for i in range(1, 6): | |
| key = os.environ.get(f"GEMINI_API_KEY_{i}") | |
| if key: | |
| GEMINI_KEYS.append(key) | |
| GMAIL_SCOPES = ["https://www.googleapis.com/auth/gmail.send"] | |
| os.makedirs(SCRAPE_OUTDIR, exist_ok=True) | |
| os.makedirs(ANALYSIS_OUTDIR, exist_ok=True) | |
| def build_gmail_service(): | |
| if not os.path.exists(SERVICE_ACCOUNT_FILE): | |
| log("Service account file not found, Gmail unavailable.", "error", "GMAIL") | |
| return None | |
| try: | |
| creds = service_account.Credentials.from_service_account_file( | |
| SERVICE_ACCOUNT_FILE, scopes=GMAIL_SCOPES | |
| ).with_subject(SENDER_EMAIL) | |
| service = build("gmail", "v1", credentials=creds) | |
| log("Gmail service built successfully using service account.", "info", "GMAIL") | |
| return service | |
| except Exception as e: | |
| log(f"Failed to build Gmail service: {e}", "error", "GMAIL") | |
| log(f"CRITICAL: Ensure your service account has Domain-Wide Delegation enabled for the user {SENDER_EMAIL}", "error", "GMAIL") | |
| return None | |
| gmail_service = build_gmail_service() | |
| class GroupRun: | |
| link: str | |
| stage: str = "pending" | |
| scraped_json: str = "" | |
| analysis_json: str = "" | |
| scraped_posts: int = 0 | |
| detected_posts: int = 0 | |
| emails_sent_by_final5: int = 0 | |
| error: str = "" | |
| class PipelineState: | |
| running: bool = False | |
| message: str = "idle" | |
| progress: int = 0 | |
| current: int = 0 | |
| total: int = 0 | |
| groups: List[GroupRun] = field(default_factory=list) | |
| recipients: List[str] = field(default_factory=list) | |
| summary_path: str = "" | |
| app = Flask(__name__, static_folder=".", static_url_path="") | |
| CORS(app) | |
| live_lock = threading.Lock() | |
| live_state: Dict[str, Any] = { | |
| "group": None, | |
| "counts": {"total_posts": 0, "kw_hits": 0, "ai_done": 0, "confirmed": 0, "emails": 0}, | |
| "posts": [] | |
| } | |
| def reset_live_state(group_link: str): | |
| with live_lock: | |
| live_state["group"] = group_link | |
| live_state["counts"] = {"total_posts": 0, "kw_hits": 0, "ai_done": 0, "confirmed": 0, "emails": 0} | |
| live_state["posts"] = [] | |
| def ensure_post_obj(pid: int) -> Dict[str, Any]: | |
| with live_lock: | |
| for p in live_state["posts"]: | |
| if p.get("id") == pid: | |
| return p | |
| p = {"id": pid, "text": "", "group_link": live_state.get("group")} | |
| live_state["posts"].append(p) | |
| return p | |
| def load_scraped_into_live(path: str): | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| posts = json.load(f) | |
| except Exception as e: | |
| log(f"live load error: {e}", "error", "LIVE") | |
| return | |
| with live_lock: | |
| live_state["posts"] = posts | |
| live_state["counts"]["total_posts"] = len(posts) | |
| def handle_event_line(line: str): | |
| if not line.startswith("::"): | |
| return | |
| try: | |
| if "::SCRAPE_SAVED::" in line: | |
| path = line.split("::SCRAPE_SAVED::", 1)[1].strip() | |
| if path: | |
| load_scraped_into_live(path) | |
| elif "::KW_HIT::" in line: | |
| d = json.loads(line.split("::KW_HIT::", 1)[1].strip()) | |
| p = ensure_post_obj(int(d["id"])) | |
| p["found_keywords"] = d.get("found_keywords", []) | |
| with live_lock: | |
| live_state["counts"]["kw_hits"] += 1 | |
| elif "::AI_RESULT::" in line: | |
| d = json.loads(line.split("::AI_RESULT::", 1)[1].strip()) | |
| p = ensure_post_obj(int(d["id"])) | |
| ai = d.get("ai", {}) | |
| p["ai"] = ai | |
| with live_lock: | |
| live_state["counts"]["ai_done"] += 1 | |
| if ai.get("is_medical_seeking"): | |
| live_state["counts"]["confirmed"] += 1 | |
| elif "::EMAIL_SENT::" in line: | |
| d = json.loads(line.split("::EMAIL_SENT::", 1)[1].strip()) | |
| p = ensure_post_obj(int(d["id"])) | |
| sent = int(d.get("sent", 0)) | |
| p["email_sent"] = sent > 0 | |
| if sent > 0: | |
| with live_lock: | |
| live_state["counts"]["emails"] += sent | |
| except Exception as e: | |
| log(f"live parse error: {e}", "error", "LIVE") | |
| def read_groups(path: str) -> List[str]: | |
| if not os.path.exists(path): | |
| return [] | |
| with open(path, "r", encoding="utf-8") as f: | |
| return [ln.strip() for ln in f.read().splitlines() if ln.strip()] | |
| def slugify(url: str) -> str: | |
| s = re.sub(r"[^a-zA-Z0-9]+", "-", url) | |
| return s.strip("-").lower() | |
| def send_html_email(to_emails: List[str], subject: str, html_content: str) -> int: | |
| if not gmail_service: | |
| log("Gmail not configured; skipping email", "warn", "gmail") | |
| return 0 | |
| from email.message import EmailMessage | |
| sent = 0 | |
| for to in to_emails: | |
| try: | |
| msg = EmailMessage() | |
| msg["to"] = to | |
| msg["from"] = SENDER_EMAIL | |
| msg["subject"] = subject | |
| msg.set_content(html_content, subtype="html") | |
| raw = base64.urlsafe_b64encode(msg.as_bytes()).decode("utf-8") | |
| gmail_service.users().messages().send(userId="me", body={"raw": raw}).execute() | |
| sent += 1 | |
| log(f"Successfully sent email to {to}", "info", "GMAIL") | |
| except HttpError as e: | |
| log(f"Gmail HTTP error to {to}: {e}", "error", "gmail") | |
| except Exception as e: | |
| log(f"Gmail send error to {to}: {e}", "error", "gmail") | |
| return sent | |
| def build_confirmed_posts_email(groups_run: List["GroupRun"], all_confirmed_posts: List[Dict[str, Any]]) -> str: | |
| total_groups = len(groups_run) | |
| total_scraped = sum(g.scraped_posts for g in groups_run) | |
| total_confirmed = len(all_confirmed_posts) | |
| rows = [] | |
| for g in groups_run: | |
| rows.append(f""" | |
| <tr> | |
| <td style="padding: 8px; border-bottom: 1px solid #eee;"><a href="{g.link}" target="_blank">{g.link}</a></td> | |
| <td style="padding: 8px; border-bottom: 1px solid #eee; text-align: center;">{g.scraped_posts}</td> | |
| <td style="padding: 8px; border-bottom: 1px solid #eee; text-align: center;">{g.detected_posts}</td> | |
| <td style="padding: 8px; border-bottom: 1px solid #eee;">{"OK" if g.stage == "done" else "ERROR"}</td> | |
| </tr>""") | |
| summary_table_html = f"""<h3>Group Summary</h3> | |
| <table style="width: 100%; border-collapse: collapse; margin-top: 8px; border: 1px solid #ddd;"> | |
| <thead> | |
| <tr style="background: #0f172a; color: #fff;"> | |
| <th style="text-align: left; padding: 8px;">Group Link</th> | |
| <th style="text-align: center; padding: 8px;">Posts Scraped</th> | |
| <th style="text-align: center; padding: 8px;">Confirmed Posts</th> | |
| <th style="text-align: left; padding: 8px;">Status</th> | |
| </tr> | |
| </thead> | |
| <tbody> | |
| {''.join(rows)} | |
| </tbody> | |
| </table>""" | |
| if all_confirmed_posts: | |
| posts_html = "".join( | |
| f""" | |
| <div style="margin-bottom: 25px; padding: 12px; border: 1px solid #ddd; border-radius: 5px; background-color: #fafafa;"> | |
| <h4 style="margin-top: 0; margin-bottom: 8px;">Post ID: {p.get("id", "N/A")} | Urgency: {p.get("ai_analysis", {}).get("urgency_level", "N/A")} | Confidence: {p.get("ai_analysis", {}).get("confidence", "N/A")}</h4> | |
| <p style="margin: 5px 0;"><strong>Summary:</strong> {html.escape(p.get("ai_analysis", {}).get("medical_summary", "N/A"))}</p> | |
| <p style="margin: 5px 0;"><strong>Text:</strong></p> | |
| <pre style="white-space: pre-wrap; background-color: #f0f0f0; padding: 8px; border: 1px solid #eee; border-radius: 3px; font-family: monospace; font-size: 0.9em;">{html.escape(p.get("text", "N/A"))}</pre> | |
| <p style="margin: 5px 0;"><a href="{p.get("group_link", "#")}" target="_blank">View Group</a></p> | |
| </div>""" | |
| for p in all_confirmed_posts | |
| ) | |
| else: | |
| posts_html = "<p>No confirmed medical posts were found during this run.</p>" | |
| return f"""<!DOCTYPE html> | |
| <html> | |
| <head><title>Hillside Medical Group - Confirmed Medical Posts Summary</title></head> | |
| <body style="font-family: Arial, sans-serif; margin: 0; padding: 0; background-color: #f5f5f5;"> | |
| <div style="max-width: 900px; margin: 20px auto; padding: 20px; background-color: #ffffff; border: 1px solid #e0e0e0; border-radius: 8px;"> | |
| <div style="background: #1e3c72; color: #fff; padding: 16px 20px; border-radius: 6px 6px 0 0;"> | |
| <h2 style="margin: 0;">Hillside Medical Group - Confirmed Medical Posts</h2> | |
| <div style="font-size: 0.9em;">Run completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</div> | |
| </div> | |
| <div style="padding: 16px;"> | |
| <p><strong>Overall Summary:</strong> Processed {total_groups} groups, scraped {total_scraped} posts, found {total_confirmed} confirmed medical posts.</p> | |
| <hr style="margin: 20px 0; border: 0; border-top: 1px solid #eee;"> | |
| {summary_table_html} | |
| <hr style="margin: 20px 0; border: 0; border-top: 1px solid #eee;"> | |
| <h3>Confirmed Posts Details</h3> | |
| {posts_html} | |
| </div> | |
| <div style="margin-top: 20px; padding: 10px; font-size: 0.8em; color: #666; border-top: 1px solid #eee;"> | |
| <p>This email contains posts identified as potentially seeking personal medical help. Please review and take appropriate action.</p> | |
| <p><em>Note: The link provided is to the group. Direct post links are not currently extracted.</em></p> | |
| </div> | |
| </div> | |
| </body> | |
| </html>""" | |
| state = PipelineState() | |
| def stream_process_lines(args: List[str], env: Optional[Dict[str, str]] = None, tag: str = "FINAL5") -> int: | |
| log(f"Exec: {' '.join(args)}", "info", tag) | |
| proc = subprocess.Popen( | |
| args, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| bufsize=1, | |
| universal_newlines=True, | |
| env=env or os.environ.copy() | |
| ) | |
| def pump(pipe, name): | |
| for raw in pipe: | |
| line = (raw or "").rstrip("\n") | |
| if not line: | |
| continue | |
| if line.startswith("::"): | |
| try: | |
| handle_event_line(line) | |
| except Exception as e: | |
| log(f"event parse error: {e}", "error", tag) | |
| log(line, "info" if name == "stdout" else "warn", tag) | |
| t1 = threading.Thread(target=pump, args=(proc.stdout, "stdout"), daemon=True) | |
| t2 = threading.Thread(target=pump, args=(proc.stderr, "stderr"), daemon=True) | |
| t1.start() | |
| t2.start() | |
| rc = proc.wait() | |
| t1.join(timeout=0.2) | |
| t2.join(timeout=0.2) | |
| log(f"Exit code: {rc}", "info", tag) | |
| return rc | |
| def call_final5_for_group(group_url: str, out_json: str, analysis_json: str, recipients: List[str]) -> Dict[str, Any]: | |
| args = [ | |
| PYTHON_BIN, FINAL5_PATH, | |
| "--group", group_url, | |
| "--out", out_json, | |
| "--analysis-out", analysis_json, | |
| "--recipients", ",".join(recipients), | |
| "--sender", SENDER_EMAIL, | |
| "--cookies-file", COOKIES_PATH, | |
| "--headless" | |
| ] | |
| if GEMINI_KEYS: | |
| args.extend(["--gemini-keys", ",".join(GEMINI_KEYS)]) | |
| env = os.environ.copy() | |
| env["PYTHONUNBUFFERED"] = "1" | |
| env["PYTHONIOENCODING"] = "utf-8" | |
| env.setdefault("HOME", WRITABLE_DIR) | |
| env.setdefault("WDM_LOCAL", "1") | |
| env.setdefault("WDM_CACHE_DIR", os.path.join(WRITABLE_DIR, ".wdm")) | |
| env.setdefault("SE_MANAGER_DRIVER_CACHE", os.path.join(WRITABLE_DIR, "selenium")) | |
| os.makedirs(env["WDM_CACHE_DIR"], exist_ok=True) | |
| os.makedirs(env["SE_MANAGER_DRIVER_CACHE"], exist_ok=True) | |
| rc = stream_process_lines(args, env=env, tag="FINAL5") | |
| return {"ok": rc == 0, "code": rc} | |
| def run_pipeline(recipients: List[str]): | |
| try: | |
| logs.clear() | |
| log("Pipeline starting", "info", "ORCHESTRATOR") | |
| state.running = True | |
| state.message = "initializing" | |
| state.progress = 0 | |
| state.recipients = recipients | |
| state.groups.clear() | |
| links = read_groups(GROUPS_TXT) | |
| state.total = len(links) | |
| if not links: | |
| log("No groups found in groups.txt", "warn", "ORCHESTRATOR") | |
| state.message = "No groups" | |
| state.running = False | |
| return | |
| all_confirmed_posts = [] | |
| for i, link in enumerate(links, start=1): | |
| reset_live_state(link) | |
| g = GroupRun(link=link, stage="running") | |
| state.groups.append(g) | |
| state.current = i | |
| state.message = f"Processing {link}" | |
| state.progress = int(((i - 1) / max(1, state.total)) * 100) | |
| log(f"[{i}/{state.total}] Processing group: {link}", "info", "ORCHESTRATOR") | |
| slug = slugify(link) | |
| out_json = os.path.join(SCRAPE_OUTDIR, f"{slug}.json") | |
| analysis_json = os.path.join(ANALYSIS_OUTDIR, f"analysis_{slug}.json") | |
| g.scraped_json = out_json | |
| g.analysis_json = analysis_json | |
| result = call_final5_for_group(link, out_json, analysis_json, recipients) | |
| if not result.get("ok"): | |
| g.stage = "error" | |
| g.error = f"final5 exit code {result.get('code')}" | |
| log(f"final5 failed for {link}: code {result.get('code')}", "error", "ORCHESTRATOR") | |
| else: | |
| try: | |
| if os.path.exists(out_json): | |
| with open(out_json, "r", encoding="utf-8") as f: | |
| g.scraped_posts = len(json.load(f)) | |
| if os.path.exists(analysis_json): | |
| with open(analysis_json, "r", encoding="utf-8") as f: | |
| a = json.load(f) | |
| g.detected_posts = a.get("confirmed_medical", 0) | |
| g.emails_sent_by_final5 = a.get("emails_sent", 0) | |
| confirmed_posts = a.get("posts", []) | |
| for post in confirmed_posts: | |
| if "group_link" not in post: | |
| post["group_link"] = link | |
| all_confirmed_posts.extend(confirmed_posts) | |
| g.stage = "done" | |
| log(f"Group done: scraped={g.scraped_posts}, confirmed={g.detected_posts}", "info", "ORCHESTRATOR") | |
| except Exception as e: | |
| g.stage = "error" | |
| g.error = f"parse_error: {e}" | |
| log(f"Parsing outputs failed for {link}: {e}", "error", "ORCHESTRATOR") | |
| state.progress = int((i / max(1, state.total)) * 100) | |
| try: | |
| html_content = build_confirmed_posts_email(state.groups, all_confirmed_posts) | |
| subject = f"🩺 Hillside - Confirmed Medical Posts Found ({len(all_confirmed_posts)} total)" | |
| sent_count = send_html_email(recipients, subject, html_content) | |
| log(f"Consolidated email sent to {len(recipients)} recipient(s), {sent_count} successful", "info", "GMAIL") | |
| except Exception as e: | |
| log(f"Error building or sending consolidated email: {e}", "error", "ORCHESTRATOR") | |
| summary = {"run_date": datetime.now().isoformat(), "groups": [g.__dict__ for g in state.groups]} | |
| summary_path = os.path.join(ANALYSIS_OUTDIR, "analysis_summary.json") | |
| with open(summary_path, "w", encoding="utf-8") as f: | |
| json.dump(summary, f, ensure_ascii=False, indent=2) | |
| state.summary_path = summary_path | |
| state.message = "All groups processed" | |
| state.progress = 100 | |
| state.running = False | |
| log("Pipeline finished", "info", "ORCHESTRATOR") | |
| except Exception as e: | |
| state.message = f"pipeline_error: {e}" | |
| state.running = False | |
| log(f"Pipeline error: {e}\n{traceback.format_exc()}", "error", "ORCHESTRATOR") | |
| def index(): | |
| return send_from_directory(".", "index.html") | |
| def system_status(): | |
| return jsonify({ | |
| "gmail": gmail_service is not None, | |
| "groups_file_exists": os.path.exists(GROUPS_TXT), | |
| "groups_count": len(read_groups(GROUPS_TXT)), | |
| "scrape_outdir": SCRAPE_OUTDIR, | |
| "analysis_outdir": ANALYSIS_OUTDIR, | |
| "sender_email": SENDER_EMAIL, | |
| "final5_exists": os.path.exists(FINAL5_PATH), | |
| "gemini_keys_count": len(GEMINI_KEYS) | |
| }) | |
| def api_groups(): | |
| return jsonify({"groups": read_groups(GROUPS_TXT)}) | |
| def api_process_start(): | |
| if state.running: | |
| return jsonify({"success": False, "message": "Already running"}), 409 | |
| data = request.json or {} | |
| recips = data.get("recipients") or [SENDER_EMAIL] | |
| if isinstance(recips, str): | |
| recips = [e.strip() for e in recips.split(",") if e.strip()] | |
| threading.Thread(target=run_pipeline, args=(recips,), daemon=True).start() | |
| log(f"Start requested by client; recipients={recips}", "info", "API") | |
| return jsonify({"success": True, "message": "Pipeline started", "recipients": recips}) | |
| def api_process_status(): | |
| return jsonify({ | |
| "running": state.running, | |
| "message": state.message, | |
| "progress": state.progress, | |
| "current": state.current, | |
| "total": state.total, | |
| "groups": [g.__dict__ for g in state.groups] | |
| }) | |
| def api_process_logs(): | |
| data, last_id = logs.get_after(int(request.args.get("after", "0")), limit=int(request.args.get("limit", "500"))) | |
| return jsonify({"entries": data, "last": last_id}) | |
| def api_clear_logs(): | |
| logs.clear() | |
| log("Logs cleared by client", "info", "API") | |
| return jsonify({"success": True}) | |
| def api_live_state(): | |
| with live_lock: | |
| return jsonify({"success": True, "data": live_state}) | |
| def api_results_summary(): | |
| p = state.summary_path or os.path.join(ANALYSIS_OUTDIR, "analysis_summary.json") | |
| if not os.path.exists(p): | |
| return jsonify({"success": False, "message": "No summary yet"}), 404 | |
| with open(p, "r", encoding="utf-8") as f: | |
| return jsonify({"success": True, "data": json.load(f)}) | |
| def api_get_recipients(): | |
| recipients_path = "recipients.json" | |
| if not os.path.exists(recipients_path): | |
| return jsonify({"success": False, "message": "recipients.json not found"}), 404 | |
| try: | |
| with open(recipients_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| if not isinstance(data, list): | |
| return jsonify({"success": False, "message": "Invalid format"}), 500 | |
| return jsonify({"success": True, "data": data}) | |
| except Exception as e: | |
| return jsonify({"success": False, "message": f"Error reading file: {str(e)}"}), 500 | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| app.run(host="0.0.0.0", port=port) | |