NexusCRM / app.py
kidpro2002's picture
fix(backend): correct platform parameter passing and db stats endpoints
54951df
import os
import sys
import subprocess
import threading
import queue
import time
import json
from flask import Flask, send_from_directory, request, jsonify, Response
from flask_cors import CORS
from dotenv import load_dotenv
# Load environment variables (from Hugging Face Secrets)
load_dotenv()
# Ensure lead_gen_pro is in the path so we can import its modules
sys.path.append(os.path.join(os.getcwd(), 'lead_gen_pro'))
app = Flask(__name__, static_folder='static')
CORS(app)
# --- Configuration ---
PORT = int(os.environ.get("PORT", 7860))
BASE_DIR = os.path.join(os.getcwd(), 'lead_gen_pro')
DB_PATH = os.path.join(BASE_DIR, "leads_doe.db")
MAIN_PRO = os.path.join(BASE_DIR, "main_pro.py")
# Supabase config β€” loaded from environment or defaults
SUPABASE_URL = os.environ.get("SUPABASE_URL", "https://nvssvykqxaurtlgwxwwy.supabase.co")
SUPABASE_KEY = os.environ.get("SUPABASE_KEY", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6Im52c3N2eWtxeGF1cnRsZ3d4d3d5Iiwicm9sZSI6ImFub24iLCJpYXQiOjE3NzI4OTIxNjUsImV4cCI6MjA4ODQ2ODE2NX0.onzmNQGBy6jDxWO7TcjZuyvgvId9HiGNzUNs1HmOAMk")
# Log queue for SSE (Server-Sent Events)
log_queue = queue.Queue()
active_process = None
process_lock = threading.Lock()
# --- Unified UI Serving ---
@app.route("/")
def index():
"""Serve the NexusCRM frontend."""
return send_from_directory(app.static_folder, "index.html")
@app.route("/<path:path>")
def static_proxy(path):
"""Serve other static files if any."""
return send_from_directory(app.static_folder, path)
# ═══════════════════════════════════════════════════════
# SUPABASE PROXY β€” Keys NEVER leave the server
# ═══════════════════════════════════════════════════════
import urllib.request
import urllib.error
def _supabase_headers():
"""Build headers for Supabase REST API."""
return {
"apikey": SUPABASE_KEY,
"Authorization": f"Bearer {SUPABASE_KEY}",
"Content-Type": "application/json",
"Prefer": "return=representation"
}
ALLOWED_TABLES = {"leads", "customers", "opportunities"}
@app.route("/api/data/<table>")
def proxy_get(table):
"""Proxy GET requests to Supabase."""
if table not in ALLOWED_TABLES:
return jsonify({"error": "Table not allowed"}), 403
url = f"{SUPABASE_URL}/rest/v1/{table}?select=*"
# Forward query params (like filters)
qs = request.query_string.decode()
if qs:
url = f"{SUPABASE_URL}/rest/v1/{table}?{qs}"
req = urllib.request.Request(url, headers=_supabase_headers())
try:
with urllib.request.urlopen(req) as resp:
data = json.loads(resp.read().decode())
return jsonify(data)
except urllib.error.HTTPError as e:
return jsonify({"error": str(e)}), e.code
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/data/<table>", methods=["POST"])
def proxy_post(table):
"""Proxy POST requests to Supabase."""
if table not in ALLOWED_TABLES:
return jsonify({"error": "Table not allowed"}), 403
payload = json.dumps(request.get_json(force=True)).encode()
req = urllib.request.Request(
f"{SUPABASE_URL}/rest/v1/{table}",
data=payload,
headers=_supabase_headers(),
method="POST"
)
try:
with urllib.request.urlopen(req) as resp:
data = json.loads(resp.read().decode())
return jsonify(data), 201
except urllib.error.HTTPError as e:
body = e.read().decode() if e.fp else str(e)
return jsonify({"error": body}), e.code
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/data/<table>/<record_id>", methods=["PATCH"])
def proxy_patch(table, record_id):
"""Proxy PATCH requests to Supabase."""
if table not in ALLOWED_TABLES:
return jsonify({"error": "Table not allowed"}), 403
payload = json.dumps(request.get_json(force=True)).encode()
headers = _supabase_headers()
headers["Prefer"] = "return=minimal"
req = urllib.request.Request(
f"{SUPABASE_URL}/rest/v1/{table}?id=eq.{record_id}",
data=payload,
headers=headers,
method="PATCH"
)
try:
with urllib.request.urlopen(req) as resp:
return jsonify({"status": "updated"}), 200
except urllib.error.HTTPError as e:
body = e.read().decode() if e.fp else str(e)
return jsonify({"error": body}), e.code
except Exception as e:
return jsonify({"error": str(e)}), 500
# ═══════════════════════════════════════════════════════
# LEAD GEN PRO API
# ═══════════════════════════════════════════════════════
@app.route("/api/health")
def health():
"""Verify the cloud server is active."""
return jsonify({
"status": "online",
"environment": "Hugging Face Cloud",
"db_access": os.path.exists(DB_PATH)
})
@app.route("/api/scrape", methods=["POST"])
def scrape():
"""Bridge to the Lead Gen Pro scraping engine."""
global active_process
with process_lock:
if active_process and active_process.poll() is None:
return jsonify({"error": "A scraping process is already running in the cloud."}), 409
data = request.get_json(force=True) or {}
cmd = [
"python", MAIN_PRO,
"--pipeline",
"--niche", data.get("niche", "Real Estate"),
"--limit", str(data.get("limit", 20)),
"--type", data.get("lead_type", "both")
]
if data.get("country"): cmd += ["--country", data["country"]]
if data.get("state"): cmd += ["--state", data["state"]]
if data.get("city"): cmd += ["--city", data["city"]]
if data.get("b2b_platforms"):
cmd += ["--b2b-platforms"] + data["b2b_platforms"]
if data.get("b2c_platforms"):
cmd += ["--b2c-platforms"] + data["b2c_platforms"]
def _run_worker():
global active_process
log_queue.put("πŸš€ [CLOUD] Iniciando motor de scraping en Hugging Face...\n")
proc = subprocess.Popen(
cmd,
cwd=BASE_DIR,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
env={**os.environ, "PYTHONUNBUFFERED": "1"}
)
with process_lock:
active_process = proc
for line in proc.stdout:
log_queue.put(line)
proc.wait()
log_queue.put(f"\nβœ… [CLOUD] Proceso finalizado (CΓ³digo: {proc.returncode})\n")
try:
log_queue.put("♻️ [CLOUD] Sincronizando resultados con Supabase Cloud...\n")
from l3_execution.supabase_sync import sync_all_leads_to_supabase
stats = sync_all_leads_to_supabase()
log_queue.put(f"πŸ“Š [SYNC] SincronizaciΓ³n exitosa: {stats}\n")
except Exception as e:
log_queue.put(f"❌ [SYNC-ERROR] Error en sincronización cloud: {str(e)}\n")
log_queue.put("[END]")
threading.Thread(target=_run_worker, daemon=True).start()
return jsonify({"status": "launched", "location": "Hugging Face Space"})
@app.route("/api/stream")
def stream():
"""SSE endpoint for real-time logs in the CRM UI."""
def generate():
yield "data: [CONECTADO] Servidor Hugging Face listo...\n\n"
while True:
try:
line = log_queue.get(timeout=60)
if line == "[END]":
yield "data: [END]\n\n"
break
yield f"data: {line.rstrip()}\n\n"
except queue.Empty:
yield "data: [PING]\n\n"
return Response(generate(), mimetype="text/event-stream")
@app.route("/api/sync", methods=["POST"])
def manual_sync():
"""Force manual sync to Supabase from the cloud."""
try:
from l3_execution.supabase_sync import sync_all_leads_to_supabase
res = sync_all_leads_to_supabase()
return jsonify({"status": "success", "data": res})
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route("/api/stats")
def stats():
"""Get lead generation statistics from the local container DB."""
import sqlite3
try:
if not os.path.exists(DB_PATH):
return jsonify({
"total": 0,
"leads_30d": 0,
"pending_whatsapp": 0,
"by_niche": {}
})
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM leads")
total = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM leads WHERE created_at > date('now', '-30 days')")
total_30d = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM leads WHERE (whatsapp_sent IS NULL OR whatsapp_sent = 0) AND (phone IS NOT NULL OR phone_formatted IS NOT NULL)")
pending_wa = cur.fetchone()[0]
cur.execute("SELECT niche, COUNT(*) as count FROM leads GROUP BY niche ORDER BY count DESC LIMIT 5")
by_niche = {row['niche']: row['count'] for row in cur.fetchall() if row['niche']}
conn.close()
return jsonify({
"total": total,
"leads_30d": total_30d,
"pending_whatsapp": pending_wa,
"by_niche": by_niche
})
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=PORT)