Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import subprocess | |
| import threading | |
| import queue | |
| import time | |
| import json | |
| from flask import Flask, send_from_directory, request, jsonify, Response | |
| from flask_cors import CORS | |
| from dotenv import load_dotenv | |
| # Load environment variables (from Hugging Face Secrets) | |
| load_dotenv() | |
| # Ensure lead_gen_pro is in the path so we can import its modules | |
| sys.path.append(os.path.join(os.getcwd(), 'lead_gen_pro')) | |
| app = Flask(__name__, static_folder='static') | |
| CORS(app) | |
| # --- Configuration --- | |
| PORT = int(os.environ.get("PORT", 7860)) | |
| BASE_DIR = os.path.join(os.getcwd(), 'lead_gen_pro') | |
| DB_PATH = os.path.join(BASE_DIR, "leads_doe.db") | |
| MAIN_PRO = os.path.join(BASE_DIR, "main_pro.py") | |
| # Supabase config β loaded from environment or defaults | |
| SUPABASE_URL = os.environ.get("SUPABASE_URL", "https://nvssvykqxaurtlgwxwwy.supabase.co") | |
| SUPABASE_KEY = os.environ.get("SUPABASE_KEY", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6Im52c3N2eWtxeGF1cnRsZ3d4d3d5Iiwicm9sZSI6ImFub24iLCJpYXQiOjE3NzI4OTIxNjUsImV4cCI6MjA4ODQ2ODE2NX0.onzmNQGBy6jDxWO7TcjZuyvgvId9HiGNzUNs1HmOAMk") | |
| # Log queue for SSE (Server-Sent Events) | |
| log_queue = queue.Queue() | |
| active_process = None | |
| process_lock = threading.Lock() | |
| # --- Unified UI Serving --- | |
| def index(): | |
| """Serve the NexusCRM frontend.""" | |
| return send_from_directory(app.static_folder, "index.html") | |
| def static_proxy(path): | |
| """Serve other static files if any.""" | |
| return send_from_directory(app.static_folder, path) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SUPABASE PROXY β Keys NEVER leave the server | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import urllib.request | |
| import urllib.error | |
| def _supabase_headers(): | |
| """Build headers for Supabase REST API.""" | |
| return { | |
| "apikey": SUPABASE_KEY, | |
| "Authorization": f"Bearer {SUPABASE_KEY}", | |
| "Content-Type": "application/json", | |
| "Prefer": "return=representation" | |
| } | |
| ALLOWED_TABLES = {"leads", "customers", "opportunities"} | |
| def proxy_get(table): | |
| """Proxy GET requests to Supabase.""" | |
| if table not in ALLOWED_TABLES: | |
| return jsonify({"error": "Table not allowed"}), 403 | |
| url = f"{SUPABASE_URL}/rest/v1/{table}?select=*" | |
| # Forward query params (like filters) | |
| qs = request.query_string.decode() | |
| if qs: | |
| url = f"{SUPABASE_URL}/rest/v1/{table}?{qs}" | |
| req = urllib.request.Request(url, headers=_supabase_headers()) | |
| try: | |
| with urllib.request.urlopen(req) as resp: | |
| data = json.loads(resp.read().decode()) | |
| return jsonify(data) | |
| except urllib.error.HTTPError as e: | |
| return jsonify({"error": str(e)}), e.code | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def proxy_post(table): | |
| """Proxy POST requests to Supabase.""" | |
| if table not in ALLOWED_TABLES: | |
| return jsonify({"error": "Table not allowed"}), 403 | |
| payload = json.dumps(request.get_json(force=True)).encode() | |
| req = urllib.request.Request( | |
| f"{SUPABASE_URL}/rest/v1/{table}", | |
| data=payload, | |
| headers=_supabase_headers(), | |
| method="POST" | |
| ) | |
| try: | |
| with urllib.request.urlopen(req) as resp: | |
| data = json.loads(resp.read().decode()) | |
| return jsonify(data), 201 | |
| except urllib.error.HTTPError as e: | |
| body = e.read().decode() if e.fp else str(e) | |
| return jsonify({"error": body}), e.code | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def proxy_patch(table, record_id): | |
| """Proxy PATCH requests to Supabase.""" | |
| if table not in ALLOWED_TABLES: | |
| return jsonify({"error": "Table not allowed"}), 403 | |
| payload = json.dumps(request.get_json(force=True)).encode() | |
| headers = _supabase_headers() | |
| headers["Prefer"] = "return=minimal" | |
| req = urllib.request.Request( | |
| f"{SUPABASE_URL}/rest/v1/{table}?id=eq.{record_id}", | |
| data=payload, | |
| headers=headers, | |
| method="PATCH" | |
| ) | |
| try: | |
| with urllib.request.urlopen(req) as resp: | |
| return jsonify({"status": "updated"}), 200 | |
| except urllib.error.HTTPError as e: | |
| body = e.read().decode() if e.fp else str(e) | |
| return jsonify({"error": body}), e.code | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LEAD GEN PRO API | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def health(): | |
| """Verify the cloud server is active.""" | |
| return jsonify({ | |
| "status": "online", | |
| "environment": "Hugging Face Cloud", | |
| "db_access": os.path.exists(DB_PATH) | |
| }) | |
| def scrape(): | |
| """Bridge to the Lead Gen Pro scraping engine.""" | |
| global active_process | |
| with process_lock: | |
| if active_process and active_process.poll() is None: | |
| return jsonify({"error": "A scraping process is already running in the cloud."}), 409 | |
| data = request.get_json(force=True) or {} | |
| cmd = [ | |
| "python", MAIN_PRO, | |
| "--pipeline", | |
| "--niche", data.get("niche", "Real Estate"), | |
| "--limit", str(data.get("limit", 20)), | |
| "--type", data.get("lead_type", "both") | |
| ] | |
| if data.get("country"): cmd += ["--country", data["country"]] | |
| if data.get("state"): cmd += ["--state", data["state"]] | |
| if data.get("city"): cmd += ["--city", data["city"]] | |
| if data.get("b2b_platforms"): | |
| cmd += ["--b2b-platforms"] + data["b2b_platforms"] | |
| if data.get("b2c_platforms"): | |
| cmd += ["--b2c-platforms"] + data["b2c_platforms"] | |
| def _run_worker(): | |
| global active_process | |
| log_queue.put("π [CLOUD] Iniciando motor de scraping en Hugging Face...\n") | |
| proc = subprocess.Popen( | |
| cmd, | |
| cwd=BASE_DIR, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| bufsize=1, | |
| env={**os.environ, "PYTHONUNBUFFERED": "1"} | |
| ) | |
| with process_lock: | |
| active_process = proc | |
| for line in proc.stdout: | |
| log_queue.put(line) | |
| proc.wait() | |
| log_queue.put(f"\nβ [CLOUD] Proceso finalizado (CΓ³digo: {proc.returncode})\n") | |
| try: | |
| log_queue.put("β»οΈ [CLOUD] Sincronizando resultados con Supabase Cloud...\n") | |
| from l3_execution.supabase_sync import sync_all_leads_to_supabase | |
| stats = sync_all_leads_to_supabase() | |
| log_queue.put(f"π [SYNC] SincronizaciΓ³n exitosa: {stats}\n") | |
| except Exception as e: | |
| log_queue.put(f"β [SYNC-ERROR] Error en sincronizaciΓ³n cloud: {str(e)}\n") | |
| log_queue.put("[END]") | |
| threading.Thread(target=_run_worker, daemon=True).start() | |
| return jsonify({"status": "launched", "location": "Hugging Face Space"}) | |
| def stream(): | |
| """SSE endpoint for real-time logs in the CRM UI.""" | |
| def generate(): | |
| yield "data: [CONECTADO] Servidor Hugging Face listo...\n\n" | |
| while True: | |
| try: | |
| line = log_queue.get(timeout=60) | |
| if line == "[END]": | |
| yield "data: [END]\n\n" | |
| break | |
| yield f"data: {line.rstrip()}\n\n" | |
| except queue.Empty: | |
| yield "data: [PING]\n\n" | |
| return Response(generate(), mimetype="text/event-stream") | |
| def manual_sync(): | |
| """Force manual sync to Supabase from the cloud.""" | |
| try: | |
| from l3_execution.supabase_sync import sync_all_leads_to_supabase | |
| res = sync_all_leads_to_supabase() | |
| return jsonify({"status": "success", "data": res}) | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| def stats(): | |
| """Get lead generation statistics from the local container DB.""" | |
| import sqlite3 | |
| try: | |
| if not os.path.exists(DB_PATH): | |
| return jsonify({ | |
| "total": 0, | |
| "leads_30d": 0, | |
| "pending_whatsapp": 0, | |
| "by_niche": {} | |
| }) | |
| conn = sqlite3.connect(DB_PATH) | |
| conn.row_factory = sqlite3.Row | |
| cur = conn.cursor() | |
| cur.execute("SELECT COUNT(*) FROM leads") | |
| total = cur.fetchone()[0] | |
| cur.execute("SELECT COUNT(*) FROM leads WHERE created_at > date('now', '-30 days')") | |
| total_30d = cur.fetchone()[0] | |
| cur.execute("SELECT COUNT(*) FROM leads WHERE (whatsapp_sent IS NULL OR whatsapp_sent = 0) AND (phone IS NOT NULL OR phone_formatted IS NOT NULL)") | |
| pending_wa = cur.fetchone()[0] | |
| cur.execute("SELECT niche, COUNT(*) as count FROM leads GROUP BY niche ORDER BY count DESC LIMIT 5") | |
| by_niche = {row['niche']: row['count'] for row in cur.fetchall() if row['niche']} | |
| conn.close() | |
| return jsonify({ | |
| "total": total, | |
| "leads_30d": total_30d, | |
| "pending_whatsapp": pending_wa, | |
| "by_niche": by_niche | |
| }) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=PORT) | |