leadgen / app.py
MichaelWelsch's picture
Update app.py
ce1b2e3 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Gradio App – robuste Async-Ausführung via Status-Endpoints
- Rein asynchron mit Status-Endpoints
- Läuft serverseitig weiter, auch wenn UI/Verbindung weg ist
- Globale HTTP-Session mit Keep-Alive
- Polling mit Backoff + Retry-After
- Logging in Konsole + logs/<job_id>.log
- FIX: exclude_hash richtig laden/berücksichtigen (paginierte Wholix-Suche)
- FIX: pro Lead harte Fehlerisolierung -> kein Gradio-Absturz
- NEU: Speichert IMMER in Wholix. Falls keine echte E-Mail vorhanden ist,
wird eine stabile Platzhalter-Adresse generiert (example.com) und
ein Tag "no-email" gesetzt.
- NEU: 422-Robustheit -> progressive Degradation & Minimal-Record-Fallback
- NEU: URL-Normalisierung (Umlaute/IDNA) für Pfad/Query/Fragment
"""
import json
import os
import random
import re
import time
import uuid
import threading
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional, Tuple, Set
import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path
import gradio as gr
import requests
from requests.adapters import HTTPAdapter
# ============================== CONFIG ====================================
LEAD_BASE_URL = "https://wholixleadgenbackend.ngrok.io"
WHOLIX_BASE_URL = "https://api.wholix.ai"
MAX_LEADS = 100 # Sicherheitskappe
MAX_WORKERS = 16 # gleichzeitige Pipelines am Server
JOB_TTL_SEC = 60 * 60 # wie lange fertige Jobs im Speicher bleiben (1h)
# ============================== LOGGING ====================================
LOG_DIR = Path("logs")
LOG_DIR.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(threadName)s | %(message)s",
)
APP_LOG = logging.getLogger("app")
def get_job_logger(job_id: str) -> logging.Logger:
lg = logging.getLogger(f"job.{job_id}")
if not any(isinstance(h, RotatingFileHandler) for h in lg.handlers):
fh = RotatingFileHandler(
LOG_DIR / f"{job_id}.log",
maxBytes=512_000,
backupCount=2,
encoding="utf-8",
)
fh.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s"))
lg.addHandler(fh)
lg.setLevel(logging.INFO)
lg.propagate = True
return lg
# ========================== GLOBAL HTTP SESSION ============================
GLOBAL_SES = requests.Session()
GLOBAL_SES.headers.update({"Accept": "application/json, text/plain;q=0.9, */*;q=0.8"})
adapter = HTTPAdapter(pool_connections=50, pool_maxsize=50, max_retries=0)
GLOBAL_SES.mount("https://", adapter)
GLOBAL_SES.mount("http://", adapter)
try:
from requests_toolbelt.adapters.socket_options import TCPKeepAliveAdapter
ka = TCPKeepAliveAdapter(idle=30, interval=30, count=3)
GLOBAL_SES.mount("https://", ka)
GLOBAL_SES.mount("http://", ka)
except Exception:
pass
# ================================ HTTP =====================================
class HTTPError(Exception):
def __init__(self, status: int, url: str, body: Any = None, headers: Dict[str, str] = None):
msg = f"HTTP {status} on {url}"
if body is not None:
try:
bs = body if isinstance(body, str) else json.dumps(body, ensure_ascii=False)
msg += f": {bs[:500]}"
except Exception:
pass
super().__init__(msg)
self.status = status
self.url = url
self.body = body
self.headers = headers or {}
def _is_json_ct(ct: str) -> bool:
return bool(ct and ("application/json" in ct or ct.endswith("+json")))
def _safe_json(text: str):
try:
return json.loads(text)
except Exception:
repaired = text.replace("NaN", "null").replace("Infinity", "null").replace("-Infinity", "null")
return json.loads(repaired)
def req(
url: str,
method: str = "GET",
headers: Optional[Dict[str, str]] = None,
json_body: Any = None,
data: Any = None,
timeout: Tuple[float, float] = (5.0, 15.0),
session: Optional[requests.Session] = None,
) -> Any:
s = session or GLOBAL_SES
r = s.request(
method=method.upper(),
url=url,
headers=headers,
json=json_body,
data=data,
timeout=timeout,
)
ct = r.headers.get("content-type", "")
text = r.text or ""
if not r.ok:
body = None
if text:
try:
body = r.json() if _is_json_ct(ct) or (not ct and text.strip().startswith(("{", "["))) else text
except Exception:
body = text
raise HTTPError(r.status_code, url, body=body, headers=dict(r.headers))
if text == "":
return None
if _is_json_ct(ct) or (not ct and text.strip().startswith(("{", "["))):
try:
return r.json()
except Exception:
return _safe_json(text)
return text
# ========================= URL NORMALISIERUNG ==============================
from urllib.parse import urlsplit, urlunsplit, quote
def _normalize_url(u: Optional[str]) -> Optional[str]:
"""
Normalisiert URLs für strenge Validatoren:
- Scheme ergänzen (https)
- Host -> IDNA (punycode)
- Path/Query/Fragment UTF-8 percent-encoden
"""
if not u:
return None
u = u.strip()
if not re.match(r"^[a-zA-Z][a-zA-Z0-9+\-.]*://", u):
u = "https://" + u
try:
parts = urlsplit(u)
if not parts.scheme or not parts.netloc:
return None
# Host zu IDNA
try:
host = parts.hostname.encode("idna").decode("ascii") if parts.hostname else ""
except Exception:
return None
netloc = host
if parts.port:
netloc += f":{parts.port}"
if parts.username:
auth = parts.username
if parts.password:
auth += f":{parts.password}"
netloc = f"{auth}@{netloc}"
path = quote(parts.path or "", safe="/-._~")
query = quote(parts.query or "", safe="=&?/-._~")
fragment = quote(parts.fragment or "", safe="/-._~")
return urlunsplit((parts.scheme.lower(), netloc.lower(), path, query, fragment))
except Exception:
return None
# ========================= Async Start & Poll ==============================
def _looks_like_ngrok_html(body: Any) -> bool:
try:
s = body if isinstance(body, str) else json.dumps(body, ensure_ascii=False)
except Exception:
s = str(body)
s_lower = (s or "").lower()
return ("cdn.ngrok.com" in s_lower) or ("ngrok" in s_lower and "<html" in s_lower)
def start_async_job(url: str, body: dict, headers: dict, session: Optional[requests.Session] = None) -> str:
"""
Startet einen asynchronen Backend-Job und gibt die job_id zurück.
Neu: interne Retries (Backoff + Jitter) bei 404/408/425/429/5xx und ngrok-HTML.
Macht vorher einen Warmup-Ping auf die Base-URL.
"""
s = session or GLOBAL_SES
# --- Warmup: ngrok/Server "aufwecken" (ignoriert Fehler) ---
try:
base = re.split(r"(?<=://[^/]+)", url)[0] # "https://host.tld"
s.get(base, timeout=(3.0, 3.0))
except Exception:
pass
# --- POST mit Retry ---
attempts = 0
max_attempts = 6
delay = 0.8
while True:
attempts += 1
try:
res = req(
url=url,
method="POST",
headers={**headers, "Prefer": "respond-async"},
json_body=body,
timeout=(5.0, 15.0),
session=s,
)
job_id = (res or {}).get("job_id")
ok = (res or {}).get("ok", False)
if not ok or not job_id:
raise RuntimeError(f"Backend hat keine job_id geliefert (res={res!r})")
return job_id
except HTTPError as e:
transient_codes = {404, 408, 425, 429, 500, 502, 503, 504, 520, 522, 524}
is_transient = (e.status in transient_codes) or _looks_like_ngrok_html(e.body)
if attempts < max_attempts and is_transient:
# Retry-After respektieren
ra = 0.0
try:
if isinstance(e.body, dict):
ra = float(e.body.get("retry_after", 0) or 0)
except Exception:
ra = 0.0
if not ra:
try:
ra_hdr = (e.headers or {}).get("Retry-After", "")
ra = float(ra_hdr) if ra_hdr else 0.0
except Exception:
ra = 0.0
sleep_for = ra or (delay + random.uniform(0, 0.5 * delay))
logging.warning(f"start_async_job: transient {e.status}, retry {attempts}/{max_attempts} in {sleep_for:.1f}s …")
time.sleep(sleep_for)
delay = min(8.0, delay * 1.8)
continue
# nicht transient oder Versuche aufgebraucht
raise
def poll_status(
url: str,
headers: dict,
max_wait_sec: int = 60 * 20,
min_delay: float = 1.0,
max_delay: float = 30.0,
session: Optional[requests.Session] = None,
on_tick=None,
) -> dict:
t0 = time.time()
attempt = 0
delay = min_delay
while True:
if time.time() - t0 > max_wait_sec:
raise TimeoutError("Timeout beim Warten auf Ergebnis.")
try:
st = req(url, method="GET", headers=headers, timeout=(5.0, 6.0), session=session)
except HTTPError as e:
if e.status in (408, 425, 429, 500, 502, 503, 504, 520, 522, 524):
ra = 0.0
if isinstance(e.body, dict):
try:
ra = float(e.body.get("retry_after", 0))
except Exception:
ra = 0.0
if not ra:
try:
ra_hdr = (e.headers or {}).get("Retry-After", "")
ra = float(ra_hdr) if ra_hdr else 0.0
except Exception:
ra = 0.0
attempt += 1
delay = min(max_delay, max(min_delay, 0.7 * (2 ** attempt)))
time.sleep(ra or (delay + random.uniform(0, 0.4 * delay)))
continue
raise
if callable(on_tick):
try:
on_tick(st)
except Exception:
pass
if st and st.get("ok") and st.get("status") == "done":
return st
attempt = min(attempt + 1, 20)
delay = min(max_delay, max(min_delay, delay * 1.6))
time.sleep(delay)
# ============================= Services ====================================
def unwrap_result(payload: Any) -> Any:
p = payload or {}
if isinstance(p, dict) and p.get("ok") and any(k in p for k in ("result", "data", "content")):
p = p.get("result") or p.get("data") or p.get("content")
return p
def suggest_single_lead(token_id: str, filters: dict, icp_text: str, exclude_ids: List[str]) -> dict:
res_job_id = start_async_job(
url=f"{LEAD_BASE_URL}/lead/suggest?async=1",
body={"filters": filters, "icp_text": icp_text or "", "exclude_ids": exclude_ids or []},
headers={"X-Token-Id": token_id},
)
st = poll_status(
url=f"{LEAD_BASE_URL}/lead/suggest/status/{res_job_id}?t={int(time.time()*1000)}",
headers={"X-Token-Id": token_id},
max_wait_sec=60 * 20,
session=GLOBAL_SES,
)
res = unwrap_result(st)
if isinstance(res, dict) and isinstance(res.get("items"), list) and res["items"]:
return res["items"][0]
if isinstance(res, list) and res:
return res[0]
return res if isinstance(res, dict) else {"result": res}
def _flatten_text(val: Any) -> str:
"""
Newlines/Tabs/Mehrfach-Whitespace sauber zu einem Space zusammenziehen.
Keine Feldraten, nur Plain-String-Verarbeitung.
"""
s = "" if val is None else str(val)
s = s.replace("\r\n", "\n").replace("\r", "\n")
s = re.sub(r"\s+", " ", s).strip()
return s
def normalize_draft(raw: Any) -> Dict[str, Any]:
r = unwrap_result(raw) or raw or {}
if isinstance(r, dict) and "message" in r and isinstance(r["message"], dict):
r = r["message"]
email_obj = r.get("email") if isinstance(r, dict) else {}
def pick(obj, *keys):
if not isinstance(obj, dict): return ""
for k in keys:
for kk, vv in obj.items():
if isinstance(kk, str) and kk.lower() == k.lower():
if isinstance(vv, str) and vv.strip(): return vv.strip()
if vv: return vv
return ""
subject = pick(email_obj, "subject", "email_subject", "Betreff") or pick(r, "subject", "email_subject", "Betreff")
body = pick(email_obj, "body", "text", "content") or pick(r, "body", "text", "content")
fu1 = pick(r, "followup1", "FollowUp1", "LinkedIn")
fu2 = pick(r, "followup2", "FollowUp2", "Facebook")
to = pick(email_obj, "to")
return {"email": {"to": to, "subject": subject, "body": body}, "followup1": fu1, "followup2": fu2}
def email_generate_async(token_id: str, variables: dict, items: List[dict]) -> dict:
"""
Startet /email/generate (async) und gibt nur ECHTE Endpoint-Daten zurück.
- Übergibt globale Variablen 1:1 (inkl. Signatur) auf Root-Ebene.
- Keine Feldumbenennung, keine Heuristik.
Rückgabe:
{
"email": {"subject": <Betreff>, "body": <Text>},
"followup1": <FollowUp1>,
"followup2": <FollowUp2>,
"raw": <unwrap_result(...)>
}
"""
if not items:
raise ValueError("items fehlt (mindestens 1 Lead erforderlich).")
root_allowed = {
"Produkt_und_Dienstleistungsbeschreibung",
"CTA",
"Signatur",
"Checkliste_Landingpage",
"homepage_url", # optional, falls im Frontend genutzt
"tags", # wird serverseitig ignoriert, aber ok
"Touch_Point", # optional – stört den Backend-Kontrakt nicht
}
safe_vars = {}
for k, v in (variables or {}).items():
if k in root_allowed:
safe_vars[k] = v
start_body = {**safe_vars, "items": items, "item_index": 0}
try:
sent_keys = sorted(list(start_body.keys()))
sig_preview = str(start_body.get("Signatur") or "")[:120]
APP_LOG.info(f"/email/generate body keys: {sent_keys}")
APP_LOG.info(f"/email/generate Signatur (preview): {sig_preview}")
except Exception:
pass
res_job_id = start_async_job(
url=f"{LEAD_BASE_URL}/email/generate?async=1",
body=start_body,
headers={"X-Token-Id": token_id},
)
st = poll_status(
url=f"{LEAD_BASE_URL}/email/generate/status/{res_job_id}?t={int(time.time()*1000)}",
headers={"X-Token-Id": token_id},
max_wait_sec=60 * 20,
session=GLOBAL_SES,
)
raw = unwrap_result(st) or {}
results = raw.get("results") if isinstance(raw, dict) else None
first = (results[0] if isinstance(results, list) and results else {}) or {}
msg = first.get("message") if isinstance(first, dict) else {}
subj = str((msg or {}).get("Betreff") or "")
text = str((msg or {}).get("Text") or "")
fu1 = str((msg or {}).get("FollowUp1") or "")
fu2 = str((msg or {}).get("FollowUp2") or "")
return {
"email": {"subject": subj, "body": text},
"followup1": fu1,
"followup2": fu2,
"raw": raw,
}
def wholix_login(email: str, password: str) -> str:
"""
Kompatibel zur alten Signatur (liefert weiterhin ein Token-String),
richtet aber zusätzlich den globalen Auto-ReLogin-Manager _WHOLIX_SES ein.
"""
global _WHOLIX_SES
_WHOLIX_SES = WholixSession(email, password, base_url=WHOLIX_BASE_URL, session=GLOBAL_SES)
return _WHOLIX_SES.get_token()
# ===================== Helfer für Platzhalter-E-Mail =======================
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
def _slug(val: Any, maxlen: int = 24) -> str:
s = "" if val is None else str(val)
s = s.lower().strip()
s = re.sub(r"[^a-z0-9]+", "-", s)
s = re.sub(r"-+", "-", s).strip("-")
return s[:maxlen] or "x"
def _make_placeholder_email(record: dict) -> str:
# Stable, valide, kurz
fn = _slug(record.get("firstname"))
ln = _slug(record.get("lastname"))
base = (fn + "." + ln).strip(".") or "lead"
cid = _slug(record.get("exclude_hash") or uuid.uuid4().hex[:8], maxlen=16)
local = f"{base}.{cid}"[:48]
return f"{local}@example.com" # RFC-Reservedomäne, von Validatoren akzeptiert
# ===================== Wholix Store + Fallbacks ============================
# ============================= Wholix Auth ================================
# ============================= Wholix Auth (NEU) =============================
# Globaler Session-Manager (wird in wholix_login gesetzt)
_WHOLIX_SES = None # type: Optional["WholixSession"]
class WholixSession:
"""
Kapselt Token-Handling:
- Login mit email/password
- Autorisierte Requests mit Auto-ReLogin bei 401/403/419 (ein Retry)
- Thread-sicher
"""
def __init__(self, email: str, password: str, base_url: str = WHOLIX_BASE_URL, session: Optional[requests.Session] = None):
self.email = email
self.password = password
self.base_url = base_url.rstrip("/")
self._token = None
self._lock = threading.RLock()
self._ses = session or GLOBAL_SES
def _login(self) -> str:
res = req(f"{self.base_url}/api/v1/auth/login", method="POST",
json_body={"email": self.email, "password": self.password},
timeout=(5.0, 15.0), session=self._ses)
token = (res or {}).get("token")
if not token:
raise RuntimeError("Wholix-Login fehlgeschlagen.")
return token
def get_token(self) -> str:
if self._token:
return self._token
with self._lock:
if not self._token:
self._token = self._login()
return self._token
def auth_headers(self) -> Dict[str, str]:
return {"Authorization": f"Bearer {self.get_token()}"}
def req_authed(self, path: str, *, method: str = "GET", headers: Optional[Dict[str, str]] = None,
json_body: Any = None, data: Any = None, timeout: Tuple[float, float] = (5.0, 30.0)):
url = path if path.startswith("http") else f"{self.base_url}{path}"
hdrs = {}
hdrs.update(headers or {})
hdrs.update(self.auth_headers())
try:
return req(url, method=method, headers=hdrs, json_body=json_body, data=data, timeout=timeout, session=self._ses)
except HTTPError as e:
if e.status in (401, 403, 419):
with self._lock:
self._token = None
self._token = self._login()
hdrs = {}
hdrs.update(headers or {})
hdrs.update(self.auth_headers())
return req(url, method=method, headers=hdrs, json_body=json_body, data=data, timeout=timeout, session=self._ses)
raise
def _store_with_fallbacks(token: str, payload: dict, module: str) -> dict:
"""
Versucht 3 Stufen, um 422 zu vermeiden.
NEU: nutzt, wenn vorhanden, den globalen WholixSession-Manager (_WHOLIX_SES)
mit Auto-ReLogin. Fällt sonst auf das übergebene Token zurück.
"""
url_path = "/api/v1/table-object-data/store-objects"
def _post_with_session(body):
# bevorzugt Auto-ReLogin-Session
if _WHOLIX_SES is not None:
return _WHOLIX_SES.req_authed(url_path, method="POST", json_body=body, timeout=(5.0, 30.0))
# Fallback: manuelles Token (legacy)
url = f"{WHOLIX_BASE_URL}{url_path}"
headers = {"Authorization": f"Bearer {token}"}
return req(url, method="POST", headers=headers, json_body=body, timeout=(5.0, 30.0))
# 1: Original
body1 = {"module": module, "action": "store", "data": [payload]}
try:
return _post_with_session(body1)
except HTTPError as e:
if e.status != 422:
raise
# 2: URLs reparieren/entfernen
p2 = dict(payload)
for k in ("linkedin_url", "company_url"):
if p2.get(k):
fixed = _normalize_url(p2.get(k))
if fixed:
p2[k] = fixed
else:
p2.pop(k, None)
try:
return _post_with_session({"module": module, "action": "store", "data": [p2]})
except HTTPError as e:
if e.status != 422:
raise
# 3: Minimal-Record
minimal = {
"email": p2.get("email") or _make_placeholder_email(p2),
"firstname": p2.get("firstname") or "",
"lastname": p2.get("lastname") or "",
"company_name": p2.get("company_name") or "",
"exclude_hash": p2.get("exclude_hash") or _slug(uuid.uuid4().hex[:8]),
"tags": p2.get("tags") or {"keys": ["no-email"], "values": ["no-email"]},
}
return _post_with_session({"module": module, "action": "store", "data": [minimal]})
def wholix_store_contact(token: str, record: dict, module: str = "Contacts", allow_placeholder: bool = True) -> dict:
"""
Sendet NUR erlaubte Felder an Wholix und saniert problematische Werte.
IMMER speicherbar: erzeugt bei Bedarf Placeholder-Mail (example.com) + Tag.
Nutzt _store_with_fallbacks (mit Auto-ReLogin via _WHOLIX_SES, falls vorhanden).
"""
if not isinstance(record, dict):
raise ValueError("Wholix: record muss ein dict sein.")
def _clean_str(v):
if v is None:
return None
s = str(v).strip()
return s if s else None
# E-Mail prüfen / ggf. generieren
email = _clean_str(record.get("email"))
if not email or not EMAIL_RE.match(email):
if allow_placeholder:
email = _make_placeholder_email(record)
record["email"] = email
if isinstance(record.get("tags"), dict):
for k in ("keys", "values"):
record["tags"].setdefault(k, [])
if "no-email" not in record["tags"][k]:
record["tags"][k].append("no-email")
ALLOWED = {
"firstname",
"lastname",
"email", # Pflichtfeld (ggf. Platzhalter)
"adress", # (sic) genau so
"city",
"postcode",
"phonenumber",
"job_title",
"departments", # Text
"linkedin_url",
"company_name",
"company_url",
"message_mail",
"message_mail_subject",
"message_followup1",
"message_followup2",
"exclude_hash",
"status_field", # Multi-Select: {keys:[], values:[]}
"tags", # Multi-Select: {keys:[], values:[]}
}
def _coerce_departments(v):
if v is None:
return None
if isinstance(v, (list, tuple, set)):
v = ", ".join(str(x).strip() for x in v if str(x).strip())
else:
v = str(v).strip()
if v.startswith("[") and v.endswith("]"):
v = v[1:-1].strip().strip("'\"")
return _clean_str(v)
out = {}
for k in ALLOWED:
if k not in record:
continue
val = record[k]
if k == "departments":
val = _coerce_departments(val)
elif k in ("linkedin_url", "company_url"):
val = _normalize_url(val)
elif k in ("status_field", "tags"):
if not (isinstance(val, dict) and "keys" in val and "values" in val):
val = None
else:
val = _clean_str(val)
if val is not None:
out[k] = val
# Pflichtfeld sicher (jetzt inkl. Platzhalter möglich)
out["email"] = _clean_str(email)
# POST mit Fallback-Logik (nutzt Auto-ReLogin falls _WHOLIX_SES vorhanden)
return _store_with_fallbacks(token, out, module)
# ======= NEW: Wholix-Excludes paginiert laden (wie in deinem JS) ===========
def wholix_fetch_excludes(token: str,
module_name: str = "Contacts",
per_page: int = 500,
max_pages: int = 100,
dedupe: bool = True) -> List[Dict[str, str]]:
"""
Paginiert Excludes laden.
NEU: nutzt, wenn vorhanden, den globalen WholixSession-Manager (_WHOLIX_SES)
mit Auto-ReLogin. Fällt sonst auf das übergebene Token zurück.
"""
path = "/api/v1/table-object-data/fetch-paginated-results"
out: List[Dict[str, str]] = []
seen: Set[str] = set()
last_page = 10**9
for page in range(1, max_pages + 1):
if page > last_page:
break
payload = {"module": module_name, "action": "search", "page": page, "per_page": per_page}
try:
if _WHOLIX_SES is not None:
res = _WHOLIX_SES.req_authed(path, method="POST", json_body=payload, timeout=(5.0, 30.0))
else:
url = f"{WHOLIX_BASE_URL}{path}"
headers = {"Authorization": f"Bearer {token}"}
res = req(url, method="POST", headers=headers, json_body=payload, timeout=(5.0, 30.0))
except Exception as e:
APP_LOG.warning(f"Wholix-Excludes Page {page} Fehler: {e}")
break
data_block = (res or {}).get("data") or {}
rows = data_block.get("data") or data_block.get("results") or []
if isinstance(data_block.get("last_page"), (int, float)):
last_page = int(data_block.get("last_page"))
else:
last_page = page
if not isinstance(rows, list) or not rows:
break
for row in rows:
ex = (row.get("exclude_hash") or row.get("excludeHash") or row.get("exclude_id") or "").strip()
if not ex:
continue
if dedupe and ex in seen:
continue
if dedupe:
seen.add(ex)
cname = (row.get("company_name") or row.get("companyName") or "").strip()
out.append({"exclude_hash": ex, "company_name": cname})
return out
# ========================== Job Management =================================
EXEC = ThreadPoolExecutor(max_workers=MAX_WORKERS)
JOBS: Dict[str, Dict[str, Any]] = {}
def _job_init(job_id: str):
JOBS[job_id] = {
"log": [],
"progress": 0,
"rows": [],
"done": False,
"error": None,
"lock": threading.RLock(),
"created_at": time.time(),
"finished_at": None,
}
def _job_emit(job_id: str, msg: str = None, progress: Optional[int] = None, rows_append: Optional[dict] = None):
st = JOBS.get(job_id)
if not st: return
lg = get_job_logger(job_id)
with st["lock"]:
if msg:
st["log"].append(msg)
st["log"] = st["log"][-1000:]
APP_LOG.info(f"[{job_id}] {msg}")
lg.info(msg)
if isinstance(progress, int):
st["progress"] = max(0, min(100, progress))
if rows_append:
st["rows"].append(rows_append)
st["rows"] = st["rows"][-1000:]
APP_LOG.info(f"[{job_id}] row: {rows_append}")
lg.info(f"row: {rows_append}")
def _job_finish(job_id: str, error: Optional[str] = None):
st = JOBS.get(job_id)
if not st: return
lg = get_job_logger(job_id)
with st["lock"]:
st["error"] = error
st["done"] = True
st["finished_at"] = time.time()
final = "✅ Fertig." if not error else f"❌ {error}"
st["log"].append(final)
APP_LOG.info(f"[{job_id}] {final}")
(lg.error if error else lg.info)(final)
def _gc_jobs():
now = time.time()
for jid, st in list(JOBS.items()):
if st.get("done") and st.get("finished_at") and now - st["finished_at"] > JOB_TTL_SEC:
JOBS.pop(jid, None)
# ========================== Pipeline (Background) ===========================
LEAD_COUNTS = [1, 2, 3, 4, 5, 10, 15, 20, 40, 80, 100, 200, 300, 400, 500, 1000]
CURL_DATA_RE = re.compile(r"""--data(?:-raw)?\s+(?P<q>['\"])(?P<body>.*?)(?P=q)""", re.DOTALL)
HDR_XTOKEN_RE = re.compile(r"""-H\s+(?P<q>['\"])X-Token-Id:\s*(?P<val>[^'\"]+)(?P=q)""", re.IGNORECASE)
def _find_data_quote_start(s: str) -> Tuple[int, Optional[str]]:
m = re.search(r"--data(?:-raw)?\s+(['\"])", s)
if not m:
return -1, None
return m.end(1), m.group(1)
def _scan_quoted_payload(s: str, i: int, q: str) -> Tuple[str, int]:
out = []
n = len(s)
while i < n:
ch = s[i]
if q == "'":
if i + 4 < n and s[i:i+5] == "'\"'\"'":
out.append("'"); i += 5; continue
if ch == "'":
return "".join(out), i + 1
out.append(ch); i += 1
else:
if ch == "\\" and i + 1 < n:
out.append(s[i+1]); i += 2; continue
if ch == '"':
return "".join(out), i + 1
out.append(ch); i += 1
return "".join(out), i
def parse_curl(curl_text: str) -> Tuple[str, Dict[str, Any]]:
"""
Extrahiert X-Token-Id und JSON aus einem curl mit --data-raw '...'.
Keine Feldumbenennungen, keine Heuristik – JSON wird 1:1 übernommen.
"""
import shlex
if not isinstance(curl_text, str) or not curl_text.strip():
raise ValueError("Leerer curl-Text.")
token_id = ""
m = re.search(r"""-H\s+(["'])X-Token-Id:\s*([^"']+)\1""", curl_text, re.IGNORECASE)
if m:
token_id = m.group(2).strip()
if not token_id:
m2 = re.search(r"""X-Token-Id:\s*([A-Za-z0-9\-\._]+)""", curl_text, re.IGNORECASE)
if m2:
token_id = m2.group(1).strip()
if not token_id:
env_token = os.getenv("X_TOKEN_ID", "").strip()
if env_token:
token_id = env_token
if not token_id:
raise ValueError("Konnte keinen X-Token-Id Header im curl (oder env X_TOKEN_ID) finden.")
qmatch = re.search(r"""--data(?:-raw)?\s+(['\"])""", curl_text)
if not qmatch:
raise ValueError("Konnte den JSON Body nicht finden (erwarte --data-raw '...').")
quote = qmatch.group(1)
start = qmatch.end(1)
out = []
i = start
n = len(curl_text)
if quote == "'":
while i < n:
if i + 4 < n and curl_text[i:i+5] == "'\"'\"'":
out.append("'")
i += 5
continue
ch = curl_text[i]
if ch == "'":
break
out.append(ch)
i += 1
else:
while i < n:
ch = curl_text[i]
if ch == "\\" and i + 1 < n:
out.append(curl_text[i+1])
i += 2
continue
if ch == '"':
break
out.append(ch)
i += 1
body_str = "".join(out).strip()
try:
payload = json.loads(body_str)
except json.JSONDecodeError:
payload = json.loads(body_str.replace("\r\n", "\n").replace("\r", "\n"))
return token_id, payload
def run_pipeline_bg(job_id: str, curl_text: str, n_leads_ui: int):
lg = get_job_logger(job_id)
APP_LOG.info(f"[{job_id}] Background job gestartet (n={n_leads_ui})")
lg.info(f"Background job gestartet (n={n_leads_ui})")
_gc_jobs()
_job_emit(job_id, "Job gestartet.")
try:
token_id, payload = parse_curl(curl_text)
except Exception:
logging.exception(f"[{job_id}] Parse-Fehler")
_job_finish(job_id, "Parse-Fehler: siehe Logs (Stacktrace).")
return
wh_email = payload.get("wholix_email") or payload.get("Wholix_email") or ""
wh_pass = payload.get("wholix_passwort") or payload.get("wholix_password") or ""
if not wh_email or not wh_pass:
_job_finish(job_id, "In der JSON-Payload fehlen wholix_email / wholix_passwort.")
return
# Globale Felder exakt wie im Curl (1:1)
filters = payload.get("filters") or {}
icp_text = payload.get("Produkt_und_Dienstleistungsbeschreibung") or ""
checklist = payload.get("Checkliste_Landingpage") or ""
signature = payload.get("Signatur") or ""
cta = payload.get("CTA") or ""
homepage_url = payload.get("icp_homepage_url") or ""
raw_tag = payload.get("Wholic_tag") or payload.get("Wholix_tag") or "AI"
# Lead-Anzahl
try:
n_leads = int(n_leads_ui)
except Exception:
n_leads = 1
n_leads = max(1, min(n_leads, MAX_LEADS))
# Fortschritt: 1 (Login) + pro Lead 4 Schritte
total_steps = (n_leads * 4) + 1
step = 0
# 1) Wholix Login
step += 1
_job_emit(job_id, "→ Wholix Login …", progress=int(step / total_steps * 100))
try:
wh_token = wholix_login(wh_email, wh_pass)
except Exception:
logging.exception(f"[{job_id}] Wholix-Login fehlgeschlagen")
_job_finish(job_id, "Wholix-Login fehlgeschlagen (Details in Logs).")
return
# ======= EXCLUDES LADEN / MERGEN =======================================
initial_excludes = set()
try:
for x in (payload.get("exclude_ids") or []):
if x: initial_excludes.add(str(x).strip())
except Exception:
pass
_job_emit(job_id, f"→ Wholix-Excludes laden … (Seed: {len(initial_excludes)})")
try:
wh_excludes = wholix_fetch_excludes(wh_token) # paginiert
for row in wh_excludes:
ex = (row.get("exclude_hash") or "").strip()
if ex:
initial_excludes.add(ex)
_job_emit(job_id, f" {len(wh_excludes)} Excludes aus Wholix, gesamt {len(initial_excludes)}")
except Exception:
logging.exception(f"[{job_id}] Wholix-Excludes laden fehlgeschlagen")
_job_emit(job_id, " Excludes laden übersprungen (Fehler).")
excludes: List[str] = list(initial_excludes)
excludes_set: Set[str] = set(initial_excludes)
# ---- Helper zum sicheren Auslesen echter Felder (ohne Raten) -----------
def pick(d: dict, *keys):
for k in keys:
if isinstance(d, dict) and k in d and d[k] not in (None, "", [], {}):
return d[k]
return None
def norm_tags(raw) -> dict:
if raw is None:
return {"keys": [], "values": []}
vals: List[str] = []
if isinstance(raw, str):
s = raw.strip()
if (s.startswith("[") and s.endswith("]")):
try:
arr = json.loads(s)
if isinstance(arr, list):
vals = [str(x).strip() for x in arr if str(x).strip()]
else:
vals = [s.strip("[] ").strip().strip("'\"")]
except Exception:
vals = [s.strip("[] ").strip().strip("'\"")]
else:
vals = [s]
elif isinstance(raw, (list, tuple, set)):
vals = [str(x).strip() for x in raw if str(x).strip()]
else:
vals = [str(raw).strip()]
vals = [v for v in vals if v]
return {"keys": vals, "values": vals}
def norm_departments(v) -> str:
if v is None:
return None
if isinstance(v, (list, tuple, set)):
return ", ".join(str(x).strip() for x in v if str(x).strip()) or None
s = str(v).strip()
if s.startswith("[") and s.endswith("]"):
s = s[1:-1].strip().strip("'\"")
return s or None
def norm_url(u: Any) -> Optional[str]:
return _normalize_url(u)
for i in range(1, n_leads + 1):
try:
# 2) Lead holen
step += 1
_job_emit(job_id, f"→ [{i}/{n_leads}] Lead vorschlagen … (excludes={len(excludes_set)})",
progress=int(step / total_steps * 100))
try:
lead = suggest_single_lead(token_id, filters, icp_text, excludes)
except Exception:
logging.exception(f"[{job_id}] Lead-Fehler")
_job_emit(job_id, "❌ Lead-Fehler: siehe Logs")
continue
if not isinstance(lead, dict):
_job_emit(job_id, "❌ Ungültige Lead-Struktur")
continue
person = (lead.get("person") or {})
company = (lead.get("company") or {})
combined_id = str(lead.get("combined_id") or "").strip()
if combined_id and combined_id in excludes_set:
_job_emit(job_id, f" ⚠️ Lead übersprungen (bereits excluded): {combined_id}")
continue
_job_emit(job_id, f" Lead: {person.get('first_name','?')} {person.get('last_name','?')} @ {company.get('name') or company.get('company_name','?')}")
# 3) Nachricht generieren (echte Endpoint-Daten)
step += 1
_job_emit(job_id, " → Nachricht generieren …", progress=int(step / total_steps * 100))
items = [{"combined_id": combined_id, "company": company, "person": person}]
variables = {
"Produkt_und_Dienstleistungsbeschreibung": icp_text,
"CTA": cta,
"Signatur": signature,
"Checkliste_Landingpage": checklist,
"homepage_url": homepage_url, # optional
"tags": raw_tag, # kommt als "[AI]" → später normiert
"Touch_Point": "LinkedIn DM", # optional
}
try:
draft = email_generate_async(token_id, variables, items)
except Exception:
logging.exception(f"[{job_id}] Email-Generate-Fehler")
_job_emit(job_id, "❌ Email-Generate-Fehler: siehe Logs")
continue
subj_raw = (draft.get("email") or {}).get("subject", "")
body_raw = (draft.get("email") or {}).get("body", "")
subj_flat = _flatten_text(subj_raw)
body_flat = _flatten_text(body_raw)
_job_emit(job_id, f" ✉️ Subject: {subj_flat}")
_job_emit(job_id, f" ✉️ Message: {body_flat}")
# 4) Speichern in Wholix (IMMER – auch ohne echte E-Mail)
step += 1
_job_emit(job_id, " → Speichere in Wholix …", progress=int(step / total_steps * 100))
email_to = str(person.get("email") or "").strip()
try:
# --- Felder befüllen NUR wenn real vorhanden -----------------
firstname = pick(person, "first_name")
lastname = pick(person, "last_name")
job_title = pick(person, "job_title", "title", "position")
departments = norm_departments(pick(person, "departments"))
linkedin = pick(person, "linkedin_url")
phonenumber = pick(person, "phone", "phonenumber") or pick(company, "phone", "company_phone", "phonenumber")
adress = pick(person, "adress", "address", "street") or pick(company, "adress", "address", "street")
city = pick(person, "city") or pick(company, "city")
postcode = pick(person, "postcode", "postal_code", "zip") or pick(company, "postcode", "postal_code", "zip")
company_name = pick(company, "name", "company_name")
company_url = norm_url(pick(company, "url", "website_url", "website", "domain"))
tags_ms = norm_tags(raw_tag)
# Wenn keine echte E-Mail: Tag ergänzen
if not email_to:
tags_ms = {"keys": list({*tags_ms.get("keys", []), "no-email"}),
"values": list({*tags_ms.get("values", []), "no-email"})}
record = {
"firstname": firstname,
"lastname": lastname,
"email": email_to or None, # kann None sein → Platzhalter wird im Store erzeugt
"phonenumber": phonenumber,
"adress": adress,
"city": city,
"postcode": postcode,
"job_title": job_title,
"departments": departments,
"linkedin_url": linkedin,
"company_name": company_name,
"company_url": company_url,
"message_mail_subject": subj_raw or None,
"message_mail": body_raw or None,
"message_followup1": draft.get("followup1") or None,
"message_followup2": draft.get("followup2") or None,
"exclude_hash": combined_id or None,
"status_field": {"keys": ["Kontakt aufgenommen"], "values": ["Kontakt aufgenommen"]},
"tags": tags_ms,
}
store_res = wholix_store_contact(wh_token, record, allow_placeholder=True)
stored_ok = bool(store_res)
if not email_to:
_job_emit(job_id, " ℹ️ Keine echte E-Mail – Platzhalter gespeichert und Tag 'no-email' gesetzt.")
except Exception:
logging.exception(f"[{job_id}] Wholix-Store-Fehler")
_job_emit(job_id, "❌ Wholix-Store-Fehler: siehe Logs")
stored_ok = False
if combined_id:
excludes_set.add(combined_id)
excludes.append(combined_id)
_job_emit(job_id, rows_append={
"person": f"{person.get('first_name','')} {person.get('last_name','')}".strip(),
"email": email_to or "(placeholder)",
"company": company.get("name") or company.get("company_name") or "",
"subject": subj_flat,
"message": body_flat,
"stored_ok": stored_ok,
})
# 5) Abschluss
step += 1
_job_emit(job_id, " ✓ Lead abgeschlossen.", progress=int(min(99, step / total_steps * 100)))
except Exception:
logging.exception(f"[{job_id}] Unerwarteter Fehler im Lead-Durchlauf")
_job_emit(job_id, "❌ Unerwarteter Fehler – Details in Logs.")
_job_emit(job_id, progress=100)
_job_finish(job_id, None)
# ================================ UI =======================================
def build_ui():
with gr.Blocks(theme=gr.themes.Soft(), css="""
.logbox textarea { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", monospace; font-size: 12.5px; line-height: 1.35; }
""") as demo:
gr.Markdown("## Wholix Lead → Message → Store (rein asynchron, robust für lange Jobs)")
gr.Markdown(
"Füge deinen kompletten **`curl`** (mit `X-Token-Id` und JSON `--data-raw`) ein, wähle die Anzahl Leads und klicke **Start**. "
"Die Verarbeitung läuft serverseitig weiter – selbst wenn der Browser/Tab schließt. "
"Mit **Aktualisieren** holst du den aktuellen Status ab."
)
with gr.Row():
curl_in = gr.Textbox(
label="curl Befehl",
placeholder="curl -sS -N -X POST 'https://.../stream' -H 'X-Token-Id: ...' --data-raw '{...}'",
lines=12
)
with gr.Row():
count = gr.Dropdown(choices=[str(x) for x in LEAD_COUNTS], value="1", label="Anzahl Leads")
with gr.Row():
start_btn = gr.Button("🚀 Start", variant="primary")
poll_btn = gr.Button("🔄 Aktualisieren")
with gr.Row():
job_id_tb = gr.Textbox(label="Job-ID", interactive=True)
with gr.Row():
status = gr.Textbox(label="Status / Log", lines=18, interactive=False, elem_classes=["logbox"])
with gr.Row():
progress = gr.Slider(label="Progress", minimum=0, maximum=100, value=0, interactive=False)
with gr.Row():
out = gr.Dataframe(
headers=["person", "email", "company", "subject", "message", "stored_ok"],
label="Ergebnisse",
interactive=False,
wrap=True,
row_count=(0, "dynamic"),
col_count=(6, "fixed"),
)
def start_job(curl_text: str, n: str):
try:
n_int = int(n)
except Exception:
n_int = 1
n_int = max(1, min(n_int, MAX_LEADS))
job_id = str(uuid.uuid4())
_job_init(job_id)
_job_emit(job_id, f"Job gestartet: {job_id}")
EXEC.submit(run_pipeline_bg, job_id, curl_text, n_int)
st = JOBS[job_id]
with st["lock"]:
log = "\n".join(st["log"])
prog = int(st["progress"])
rows = st["rows"]
return log, prog, rows, job_id
def poll_job(job_id: str):
st = JOBS.get(job_id)
if not st:
return "Unbekannte oder abgelaufene Job-ID.", 0, []
with st["lock"]:
log = "\n".join(st["log"][-500:])
prog = int(st["progress"])
rows = st["rows"]
return log, prog, rows
start_btn.click(start_job, inputs=[curl_in, count],
outputs=[status, progress, out, job_id_tb])
poll_btn.click(poll_job, inputs=[job_id_tb],
outputs=[status, progress, out])
return demo
# ============================== MAIN =======================================
if __name__ == "__main__":
app = build_ui()
app.launch(server_name="0.0.0.0", debug=True, share=True)